diff --git a/GNUmakefile b/GNUmakefile index e760621..3715a8c 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -156,6 +156,8 @@ uhub_SOURCES := src/main.c adcrush_SOURCES := src/adcrush.c +admin_SOURCES := src/admin.c + uhub_HEADERS := \ src/adcconst.h \ src/auth.h \ @@ -200,12 +202,14 @@ autotest_OBJECTS = autotest.o libuhub_OBJECTS := $(libuhub_SOURCES:.c=.o) uhub_OBJECTS := $(uhub_SOURCES:.c=.o) adcrush_OBJECTS := $(adcrush_SOURCES:.c=.o) +admin_OBJECTS := $(admin_SOURCES:.c=.o) -all_OBJECTS := $(libuhub_OBJECTS) $(uhub_OBJECTS) $(adcrush_OBJECTS) $(autotest_OBJECTS) +all_OBJECTS := $(libuhub_OBJECTS) $(uhub_OBJECTS) $(adcrush_OBJECTS) $(autotest_OBJECTS) $(admin_OBJECTS) LIBUHUB=libuhub.a uhub_BINARY=uhub$(BIN_EXT) adcrush_BINARY=adcrush$(BIN_EXT) +admin_BINARY=uhub-admin$(BIN_EXT) autotest_BINARY=autotest/test$(BIN_EXT) %.o: %.c @@ -218,6 +222,10 @@ $(adcrush_BINARY): $(PCH) $(LIBUHUB) $(adcrush_OBJECTS) $(MSG_LD) $(CC) -o $@.tmp $(adcrush_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \ $(MV) $@.tmp $@ +$(admin_BINARY): $(PCH) $(LIBUHUB) $(admin_OBJECTS) + $(MSG_LD) $(CC) -o $@.tmp $(admin_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \ + $(MV) $@.tmp $@ + $(uhub_BINARY): $(PCH) $(LIBUHUB) $(uhub_OBJECTS) $(MSG_LD) $(CC) -o $@.tmp $(uhub_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \ $(MV) $@.tmp $@ diff --git a/src/auth.c b/src/auth.c index 3437600..8c15229 100644 --- a/src/auth.c +++ b/src/auth.c @@ -498,7 +498,7 @@ const char* acl_password_generate_challenge(struct acl_handle* acl, struct user* uint64_t tiger_res[3]; static char tiger_buf[MAX_CID_LEN+1]; - snprintf(buf, 32, "%d%d%d", (int) user->tm_connected, (int) user->id.sid, (int) user->sd); + snprintf(buf, 32, "%d%d%d", (int) user->net.tm_connected, (int) user->id.sid, (int) user->net.sd); tiger((uint64_t*) buf, strlen(buf), (uint64_t*) tiger_res); base32_encode((unsigned char*) tiger_res, TIGERSIZE, tiger_buf); diff --git a/src/commands.c b/src/commands.c index a94b93b..4eaf7e9 100644 --- a/src/commands.c +++ b/src/commands.c @@ -178,7 +178,7 @@ static int command_version(struct hub_info* hub, struct user* user, const char* static int command_myip(struct hub_info* hub, struct user* user, const char* message) { char tmp[128]; - snprintf(tmp, 128, "Your IP is \"%s\"", ip_convert_to_string(&user->ipaddr)); + snprintf(tmp, 128, "Your IP is \"%s\"", ip_convert_to_string(&user->net.ipaddr)); return command_status(hub, user, "myip", tmp); } diff --git a/src/hub.c b/src/hub.c index ce03de7..8fbe006 100644 --- a/src/hub.c +++ b/src/hub.c @@ -21,6 +21,8 @@ struct hub_info* g_hub = 0; +#define NETWORK_DUMP_DEBUG 1 + int hub_handle_message(struct hub_info* hub, struct user* u, const char* line, size_t length) { int ret = 0; @@ -144,8 +146,8 @@ int hub_handle_support(struct hub_info* hub, struct user* u, struct adc_message* if (ok) { hub_send_handshake(hub, u); - if (u->ev_read) - event_add(u->ev_read, &timeout); + if (u->net.ev_read) + event_add(u->net.ev_read, &timeout); } else { @@ -912,15 +914,15 @@ void hub_disconnect_user(struct hub_info* hub, struct user* user, int reason) /* dont read more data from this user */ /* FIXME: Remove this from here! */ - if (user->ev_read) + if (user->net.ev_read) { - event_del(user->ev_read); - hub_free(user->ev_read); - user->ev_read = 0; + event_del(user->net.ev_read); + hub_free(user->net.ev_read); + user->net.ev_read = 0; } /* this should be enough? */ - net_shutdown_r(user->sd); + net_shutdown_r(user->net.sd); hub_log(log_trace, "hub_disconnect_user(), user=%p, reason=%d, state=%d", user, reason, user->state); diff --git a/src/hubevent.c b/src/hubevent.c index 1c6b226..bfb1773 100644 --- a/src/hubevent.c +++ b/src/hubevent.c @@ -22,26 +22,26 @@ static void log_user_login(struct user* u) { const char* cred = get_user_credential_string(u->credentials); - const char* addr = ip_convert_to_string(&u->ipaddr); + const char* addr = ip_convert_to_string(&u->net.ipaddr); hub_log(log_user, "LoginOK %s/%s %s \"%s\" (%s) \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, cred, u->user_agent); } static void log_user_login_error(struct user* u, enum status_message msg) { - const char* addr = ip_convert_to_string(&u->ipaddr); + const char* addr = ip_convert_to_string(&u->net.ipaddr); const char* message = hub_get_status_message_log(u->hub, msg); hub_log(log_user, "LoginError %s/%s %s \"%s\" (%s) \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, message, u->user_agent); } static void log_user_logout(struct user* u, const char* message) { - const char* addr = ip_convert_to_string(&u->ipaddr); + const char* addr = ip_convert_to_string(&u->net.ipaddr); hub_log(log_user, "Logout %s/%s %s \"%s\" (%s)", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, message); } static void log_user_nick_change(struct user* u, const char* nick) { - const char* addr = ip_convert_to_string(&u->ipaddr); + const char* addr = ip_convert_to_string(&u->net.ipaddr); hub_log(log_user, "NickChange %s/%s %s \"%s\" -> \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, nick); } @@ -71,8 +71,8 @@ void on_login_success(struct hub_info* hub, struct user* u) hub_send_motd(hub, u); /* reset to idle timeout */ - if (u->ev_read) - event_add(u->ev_read, &timeout); + if (u->net.ev_read) + event_add(u->net.ev_read, &timeout); } void on_login_failure(struct hub_info* hub, struct user* u, enum status_message msg) diff --git a/src/hubio.c b/src/hubio.c index 964d915..a17847e 100644 --- a/src/hubio.c +++ b/src/hubio.c @@ -21,71 +21,138 @@ #include "hubio.h" -struct hub_iobuf* hub_iobuf_create(size_t max_size) +struct hub_recvq* hub_recvq_create() { - struct hub_iobuf* buf = hub_malloc_zero(sizeof(struct hub_iobuf)); - if (buf) - { - buf->buf = hub_malloc(max_size); - buf->capacity = max_size; - } - return buf; + struct hub_recvq* q = hub_malloc_zero(sizeof(struct hub_recvq)); + q->buf = hub_malloc(MAX_RECV_BUF); + + return q; } -void hub_iobuf_destroy(struct hub_iobuf* buf) +void hub_recvq_destroy(struct hub_recvq* q) { - if (buf) + if (q) { - hub_free(buf->buf); - hub_free(buf); + hub_free(q->buf); + hub_free(q); } } -int hub_iobuf_recv(struct hub_iobuf* buf, hub_iobuf_read r, void* data) +size_t hub_recvq_get(struct hub_recvq* q, void* buf, size_t bufsize) { - int size = r(data, &buf->buf[buf->offset], buf->capacity - buf->offset); - if (size > 0) + assert(bufsize >= q->size); + if (q->size) { - buf->size += size; + size_t n = q->size; + memcpy(buf, q->buf, n); + hub_free(q->buf); + q->buf = 0; + q->size = 0; + return n; } - return size; + return 0; } -int hub_iobuf_send(struct hub_iobuf* buf, hub_iobuf_write w, void* data) +size_t hub_recvq_set(struct hub_recvq* q, void* buf, size_t bufsize) { - int size = w(data, &buf->buf[buf->offset], buf->size - buf->offset); - if (size > 0) + if (!bufsize) + return 0; + + if (q->buf) { - buf->offset += size; - } - return size; -} - -char* hub_iobuf_getline(struct hub_iobuf* buf, size_t* offset, size_t* len, size_t max_size) -{ - size_t x = *offset; - char* pos = memchr(&buf->buf[x], '\n', (buf->size - x)); - - if (pos) - { - *len = &pos[0] - &buf->buf[x]; - pos[0] = '\0'; - pos = &buf->buf[x]; - (*offset) += (*len + 1); - } - return pos; -} - -void hub_iobuf_remove(struct hub_iobuf* buf, size_t n) -{ - assert(buf); - assert(n <= buf->size); - - buf->offset = 0; - - if (n == buf->size) - { - buf->size = 0; + hub_free(q->buf); + q->buf = 0; + q->size = 0; } + q->buf = hub_malloc(bufsize); + if (!q->buf) + return 0; + + q->size = bufsize; + memcpy(q->buf, buf, bufsize); + return bufsize; +} + + +struct hub_sendq* hub_sendq_create() +{ + struct hub_sendq* q = hub_malloc_zero(sizeof(struct hub_sendq)); + if (!q) + return 0; + + q->queue = list_create(); + if (!q->queue) + { + hub_free(q); + return 0; + } + + return q; +} + +static void clear_send_queue_callback(void* ptr) +{ + adc_msg_free((struct adc_message*) ptr); +} + +void hub_sendq_destroy(struct hub_sendq* q) +{ + if (q) + { + list_clear(q->queue, &clear_send_queue_callback); + list_destroy(q->queue); + hub_free(q); + } +} + +void hub_sendq_add(struct hub_sendq* q, struct adc_message* msg_) +{ + struct adc_message* msg = adc_msg_incref(msg_); + list_append(q->queue, msg); + q->size += msg->length; +} + +void hub_sendq_remove(struct hub_sendq* q, struct adc_message* msg) +{ + list_remove(q->queue, msg); + adc_msg_free(msg); + q->size -= msg->length; + q->offset = 0; +} + +int hub_sendq_send(struct hub_sendq* q, hub_recvq_write w, void* data) +{ + int ret = 0; + int bytes_sent = 0; + + struct adc_message* msg = list_get_first(q->queue); + while (msg) + { + size_t len = msg->length - q->offset; + ret = w(data, &msg->cache[q->offset], len); + + if (ret <= 0) break; + + q->offset += ret; + bytes_sent += ret; + + if (q->offset < msg->length) + break; + + hub_sendq_remove(q, msg); + msg = list_get_first(q->queue); + } + + return bytes_sent; +} + +int hub_sendq_is_empty(struct hub_sendq* q) +{ + return q->size == 0; +} + +size_t hub_sendq_get_bytes(struct hub_sendq* q) +{ + return q->size - q->offset; } diff --git a/src/hubio.h b/src/hubio.h index 7f928ca..f020cb9 100644 --- a/src/hubio.h +++ b/src/hubio.h @@ -20,53 +20,85 @@ #ifndef HAVE_UHUB_HUB_IO_H #define HAVE_UHUB_HUB_IO_H -/* - * Used as a basis for receive queue, and send queue. - */ -struct hub_iobuf +struct adc_message; +struct linked_list; +typedef int (*hub_recvq_write)(void* desc, const void* buf, size_t len); +typedef int (*hub_recvq_read)(void* desc, void* buf, size_t len); + +struct hub_sendq { - char* buf; - size_t offset; - size_t size; - size_t capacity; + size_t size; /** Size of send queue (in bytes, not messages) */ + size_t offset; /** Queue byte offset in the first message. Should be 0 unless a partial write. */ + struct linked_list* queue; /** List of queued messages */ }; -typedef int (*hub_iobuf_write)(void* desc, const void* buf, size_t len); -typedef int (*hub_iobuf_read)(void* desc, void* buf, size_t len); +struct hub_recvq +{ + char* buf; + size_t size; +}; + +/** + * Create a send queue + */ +extern struct hub_sendq* hub_sendq_create(); + +/** + * Destroy a send queue, and delete any queued messages. + */ +extern void hub_sendq_destroy(struct hub_sendq*); + +/** + * Add a message to the send queue. + */ +extern void hub_sendq_add(struct hub_sendq*, struct adc_message* msg); + +/** + * Process the send queue, and send as many messages as possible. + * @returns the number of bytes sent. + * FIXME: send error not handled here! + */ +extern int hub_sendq_send(struct hub_sendq*, hub_recvq_write, void* data); + +/** + * @returns 1 if send queue is empty, 0 otherwise. + */ +extern int hub_sendq_is_empty(struct hub_sendq*); + +/** + * @returns the number of bytes remaining to be sent in the queue. + */ +extern size_t hub_sendq_get_bytes(struct hub_sendq*); + /** - * Create and initialize a io buffer + * Create a receive queue. */ -extern struct hub_iobuf* hub_iobuf_create(size_t max_size); +extern struct hub_recvq* hub_recvq_create(); /** - * Destroy an io buffer. + * Destroy a receive queue. */ -extern void hub_iobuf_destroy(struct hub_iobuf*); +extern void hub_recvq_destroy(struct hub_recvq*); /** - * net_read() from a socket descriptor into a buffer. - * @return value from net_recv() + * Gets the buffer, copies it into buf and deallocates it. + * NOTE: bufsize *MUST* be larger than the buffer, otherwise it asserts. + * @return the number of bytes copied into buf. */ -extern int hub_iobuf_recv(struct hub_iobuf*, hub_iobuf_read, void* data); +extern size_t hub_recvq_get(struct hub_recvq*, void* buf, size_t bufsize); /** - * net_send() data from a buffer to a socket descriptor. - * @return value from net_send() + * Sets the buffer */ -extern int hub_iobuf_send(struct hub_iobuf*, hub_iobuf_write, void* data); +extern size_t hub_recvq_set(struct hub_recvq*, void* buf, size_t bufsize); /** - * Get a line from the buffer + * @return 1 if size is zero, 0 otherwise. */ -extern char* hub_iobuf_getline(struct hub_iobuf*, size_t* offset, size_t* length, size_t max_size); +extern int hub_recvq_is_empty(struct hub_recvq* buf); + -/** - * Removes the first 'n' bytes from the buffer. - * This will reset the offset and size parameters. - */ -extern void hub_iobuf_remove(struct hub_iobuf* buf, size_t n); #endif /* HAVE_UHUB_HUB_IO_H */ - diff --git a/src/inf.c b/src/inf.c index 82a9de4..c64f397 100644 --- a/src/inf.c +++ b/src/inf.c @@ -186,7 +186,7 @@ static int check_required_login_flags(struct hub_info* hub, struct user* user, s */ int check_network(struct hub_info* hub, struct user* user, struct adc_message* cmd) { - const char* address = ip_convert_to_string(&user->ipaddr); + const char* address = ip_convert_to_string(&user->net.ipaddr); /* Check for NAT override address */ if (acl_is_ip_nat_override(hub->acl, address)) diff --git a/src/netevent.c b/src/netevent.c index f7f396e..419fa3b 100644 --- a/src/netevent.c +++ b/src/netevent.c @@ -26,32 +26,77 @@ extern struct hub_info* g_hub; int net_user_send(void* ptr, const void* buf, size_t len) { struct user* user = (struct user*) ptr; - int ret = net_send(user->sd, buf, len, UHUB_SEND_SIGNAL); - printf("net_user_send: %d/%d bytes\n", ret, (int) len); - if (ret == -1) - { - printf(" errno: %d - %s\n", errno, strerror(errno)); - } + int ret = net_send(user->net.sd, buf, len, UHUB_SEND_SIGNAL); + if (ret > 0) + { + user->net.tm_last_write = time(NULL); + } + else if (ret == -1 && net_error() == EWOULDBLOCK) + { + return -2; + } + else + { + // user->close_flag = quit_socket_error; + return 0; + } return ret; } int net_user_recv(void* ptr, void* buf, size_t len) { struct user* user = (struct user*) ptr; - int ret = net_recv(user->sd, buf, len, 0); + int ret = net_recv(user->net.sd, buf, len, 0); + + hub_log(log_debug, "net_user_recv: sd=%d, len=%d/%d", user->net.sd, ret, (int) len); + + if (ret > 0) + { + user->net.tm_last_read = time(NULL); + } + + +#ifdef DEBUG_SENDQ printf("net_user_recv: %d/%d bytes\n", ret, (int) len); if (ret == -1) { printf(" errno: %d - %s\n", errno, strerror(errno)); } +#endif return ret; } +/** + * @param buf buffer to extract line from + * @param bufsize size of buffer + * @param offset (in/out) offset into buffer + * @param len (out) length of the line returned + * @param max_size maximum length of line, if line is longer, it is discarded. + * + * @return line from buffer, or NULL if no line can be returned. + */ +static char* extract_line(char* buf, size_t bufsize, size_t* offset, size_t* len, size_t max_size) +{ + size_t x = *offset; + char* pos = memchr(&buf[x], '\n', (bufsize - x)); + if (pos) + { + *len = &pos[0] - &buf[x]; + pos[0] = '\0'; + pos = &buf[x]; + (*offset) += (*len + 1); + } + return pos; +} + void net_on_read(int fd, short ev, void *arg) { + static char buf[MAX_RECV_BUF]; struct user* user = (struct user*) arg; + struct hub_recvq* q = user->net.recv_queue; + size_t buf_size; int more = 1; int flag_close = 0; @@ -71,9 +116,16 @@ void net_on_read(int fd, short ev, void *arg) } } + buf_size = hub_recvq_get(q, buf, MAX_RECV_BUF); + for (;;) { - ssize_t size = hub_iobuf_recv(user->recv_buf, net_user_recv, user); + int size = net_user_recv(user, &buf[buf_size], MAX_RECV_BUF - buf_size); + if (size > 0) + { + buf_size += size; + } + if (size == -1) { if (net_error() != EWOULDBLOCK) @@ -91,15 +143,16 @@ void net_on_read(int fd, short ev, void *arg) size_t length; char* line = 0; - while ((line = hub_iobuf_getline(user->recv_buf, &offset, &length, g_hub->config->max_recv_buffer))) + while ((line = extract_line(buf, buf_size, &offset, &length, g_hub->config->max_recv_buffer))) { + puts(line); if (hub_handle_message(g_hub, user, line, length) == -1) { flag_close = quit_protocol_error; break; } } - hub_iobuf_remove(user->recv_buf, offset); + hub_recvq_set(q, buf+offset, buf_size); } } @@ -111,18 +164,18 @@ void net_on_read(int fd, short ev, void *arg) if (user_is_logged_in(user)) { - if (user->ev_read) + if (user->net.ev_read) { struct timeval timeout = { TIMEOUT_IDLE, 0 }; - event_add(user->ev_read, &timeout); + event_add(user->net.ev_read, &timeout); } } else if (user_is_connecting(user)) { - if (user->ev_read) + if (user->net.ev_read) { struct timeval timeout = { TIMEOUT_HANDSHAKE, 0 }; - event_add(user->ev_read, &timeout); + event_add(user->net.ev_read, &timeout); } } } @@ -131,89 +184,27 @@ void net_on_read(int fd, short ev, void *arg) void net_on_write(int fd, short ev, void *arg) { struct user* user = (struct user*) arg; - struct adc_message* msg; - int ret; - int length; - int close_flag = 0; - - msg = list_get_first(user->send_queue); - while (msg) + int sent = 0; + + for (;;) { - length = msg->length - user->send_queue_offset; - ret = net_send(user->sd, &msg->cache[user->send_queue_offset], length, UHUB_SEND_SIGNAL); - - if (ret == 0 || (ret == -1 && net_error() == EWOULDBLOCK)) - { - close_flag = 0; - break; - } - else if (ret > 0) - { - - user->tm_last_write = time(NULL); - - if (ret == length) - { -#ifdef DEBUG_SENDQ - hub_log(log_error, "SENDQ: sent=%d bytes/%d (all), send_queue_size=%d, offset=%d", ret, (int) msg->length, user->send_queue_size, user->send_queue_offset); -#endif - user->send_queue_size -= ret; - user->send_queue_offset = 0; - -#ifdef DEBUG_SENDQ - if ((user->send_queue_size < 0) || (user->send_queue_offset < 0)) - { - hub_log(log_error, "INVALID: send_queue_size=%d, send_queue_offset=%d", user->send_queue_size, user->send_queue_offset); - } -#endif - - list_remove(user->send_queue, msg); - - if (user_flag_get(user, flag_user_list) && (msg == user->info || user->send_queue_size == 0)) - { - user_flag_unset(user, flag_user_list); - } - - adc_msg_free(msg); - msg = 0; - - if (user->send_queue_size == 0) - break; - } - else - { -#ifdef DEBUG_SENDQ - hub_log(log_error, "SENDQ: sent=%d bytes/%d (part), send_queue_size=%d, offset=%d", ret, (int) msg->length, user->send_queue_size, user->send_queue_offset); -#endif - user->send_queue_size -= ret; - user->send_queue_offset += ret; - -#ifdef DEBUG_SENDQ - if ((user->send_queue_size < 0) || (user->send_queue_offset < 0) || (user->send_queue_offset > msg->length)) - { - hub_log(log_error, "INVALID: send_queue_size=%d, send_queue_offset=%d", user->send_queue_size, user->send_queue_offset); - } -#endif - break; - } - } + int ret = hub_sendq_send(user->net.send_queue, net_user_send, user); + if (ret > 0) + sent += ret; else - { - close_flag = quit_socket_error; break; - } - msg = list_get_first(user->send_queue); } - - + +#if 0 if (close_flag) { hub_disconnect_user(g_hub, user, close_flag); } else +#endif + if (hub_sendq_get_bytes(user->net.send_queue)) { - if (user->send_queue_size > 0 && user->ev_write) - event_add(user->ev_write, NULL); + user_want_write(user); } } @@ -264,16 +255,16 @@ void net_on_accept(int server_fd, short ev, void *arg) } /* Store IP address in user object */ - memcpy(&user->ipaddr, &ipaddr, sizeof(ipaddr)); + memcpy(&user->net.ipaddr, &ipaddr, sizeof(ipaddr)); net_set_nonblocking(fd, 1); net_set_nosigpipe(fd, 1); - event_set(user->ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user); - event_set(user->ev_write, fd, EV_WRITE, net_on_write, user); - event_base_set(hub->evbase, user->ev_read); - event_base_set(hub->evbase, user->ev_write); - event_add(user->ev_read, &timeout); + event_set(user->net.ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user); + event_set(user->net.ev_write, fd, EV_WRITE, net_on_write, user); + event_base_set(hub->evbase, user->net.ev_read); + event_base_set(hub->evbase, user->net.ev_write); + event_add(user->net.ev_read, &timeout); } } diff --git a/src/netevent.h b/src/netevent.h index 38e162f..dc57224 100644 --- a/src/netevent.h +++ b/src/netevent.h @@ -40,5 +40,8 @@ extern void net_on_read_timeout(int fd, short ev, void* arg); */ extern void net_on_accept(int fd, short ev, void *arg); + + + #endif /* HAVE_UHUB_NET_EVENT_H */ diff --git a/src/network.h b/src/network.h index d1e179c..1daf881 100644 --- a/src/network.h +++ b/src/network.h @@ -30,6 +30,17 @@ struct net_statistics size_t errors; }; +struct net_socket_t; + +#define NET_WANT_READ 0x01 +#define NET_WANT_WRITE 0x02 +#define NET_WANT_ACCEPT 0x08 +#define NET_WANT_SSL_READ 0x10 +#define NET_WANT_SSL_WRITE 0x20 +#define NET_WANT_SSL_ACCEPT 0x40 +#define NET_WANT_SSL_CONNECT 0x40 +#define NET_WANT_SSL_X509_LOOKUP 0x80 + /** * Initialize the socket monitor subsystem. * On some operating systems this will also involve loading the TCP/IP stack @@ -214,11 +225,6 @@ extern void net_stats_get(struct net_statistics** intermediate, struct net_stati #if defined(WINSOCK) && !defined(__CYGWIN__) -// #define EINTR WSAEINTR -// #define EACCES WSAEACCES -// #define EFAULT WSAEFAULT -// #define EINVAL WSAEINVAL -// #define EMFILE WSAEMFILE #define EWOULDBLOCK WSAEWOULDBLOCK #define EINPROGRESS WSAEINPROGRESS #define EALREADY WSAEALREADY @@ -247,10 +253,8 @@ extern void net_stats_get(struct net_statistics** intermediate, struct net_stati #define ETIMEDOUT WSAETIMEDOUT #define ECONNREFUSED WSAECONNREFUSED #define ELOOP WSAELOOP -// #define ENAMETOOLONG WSAENAMETOOLONG #define EHOSTDOWN WSAEHOSTDOWN #define EHOSTUNREACH WSAEHOSTUNREACH -// #define ENOTEMPTY WSAENOTEMPTY #define EPROCLIM WSAEPROCLIM #define EUSERS WSAEUSERS #define EDQUOT WSAEDQUOT diff --git a/src/route.c b/src/route.c index 92a7e83..2f2a404 100644 --- a/src/route.c +++ b/src/route.c @@ -58,23 +58,6 @@ int route_message(struct hub_info* hub, struct user* u, struct adc_message* msg) } -static void queue_command(struct user* user, struct adc_message* msg__, int offset) -{ - struct adc_message* msg = adc_msg_incref(msg__); - list_append(user->send_queue, msg); - -#ifdef DEBUG_SENDQ - hub_log(log_trace, "SENDQ: user=%p, msg=%p (%zu), offset=%d, length=%d, total_length=%d", user, msg, msg->references, offset, msg->length, user->send_queue_size); -#endif - - user->send_queue_size += msg->length - offset; - if (list_size(user->send_queue) == 1) - { - user->send_queue_offset = offset; - user->tm_last_write = time(NULL); - } -} - // #define ALWAYS_QUEUE_MESSAGES static size_t get_max_send_queue(struct hub_info* hub) { @@ -84,13 +67,12 @@ static size_t get_max_send_queue(struct hub_info* hub) return hub->config->max_send_buffer; } +#if 0 static size_t get_max_send_queue_soft(struct hub_info* hub) { return hub->config->max_send_buffer_soft; } - - /* * @return 1 if send queue is OK. * -1 if send queue is overflowed @@ -101,76 +83,40 @@ static int check_send_queue(struct user* user, struct adc_message* msg) if (user_flag_get(user, flag_user_list)) return 1; - if ((user->send_queue_size + msg->length) > get_max_send_queue(user->hub)) + if ((user->net.send_queue->size + msg->length) > get_max_send_queue(user->hub)) return -1; - if (user->send_queue_size > get_max_send_queue_soft(user->hub) && msg->priority < 0) + if (user->net.send_queue->size > get_max_send_queue_soft(user->hub) && msg->priority < 0) return 0; return 1; } +#endif + int route_to_user(struct hub_info* hub, struct user* user, struct adc_message* msg) { - int ret; - #if LOG_SEND_MESSAGES_WHEN_ROUTED char* data = strndup(msg->cache, msg->length-1); hub_log(log_protocol, "send %s: %s", sid_to_string(user->sid), data); free(data); #endif -#ifndef ALWAYS_QUEUE_MESSAGES - if (user->send_queue_size == 0 && !user_is_disconnecting(user)) + int empty = hub_sendq_is_empty(user->net.send_queue); + hub_sendq_add(user->net.send_queue, msg); + + if (empty) { - ret = net_send(user->sd, msg->cache, msg->length, UHUB_SEND_SIGNAL); - - if (ret == msg->length) - { - return 1; - } - - if (ret >= 0 || (ret == -1 && net_error() == EWOULDBLOCK)) - { - queue_command(user, msg, ret); - - if (user->send_queue_size && user->ev_write) - event_add(user->ev_write, NULL); - } - else - { - /* A socket error occured */ - hub_disconnect_user(hub, user, quit_socket_error); - return 0; - } + // try oportunistic write } else -#endif { - ret = check_send_queue(user, msg); - if (ret == -1) - { - /* User is not able to swallow the data, let's cut our losses and disconnect. */ - hub_disconnect_user(hub, user, quit_send_queue); - } - else if (ret == 1) - { - /* queue command */ - queue_command(user, msg, 0); - if (user->ev_write) - event_add(user->ev_write, NULL); - - } - else - { - /* do not queue command as our soft-limits are exceeded */ - } + user_want_write(user); } - + return 1; } - int route_to_all(struct hub_info* hub, struct adc_message* command) /* iterate users */ { struct user* user = (struct user*) list_get_first(hub->users->list); @@ -183,7 +129,6 @@ int route_to_all(struct hub_info* hub, struct adc_message* command) /* iterate u return 0; } - int route_to_subscribers(struct hub_info* hub, struct adc_message* command) /* iterate users */ { int do_send; @@ -243,7 +188,7 @@ int route_info_message(struct hub_info* hub, struct user* u) else { struct adc_message* cmd = adc_msg_copy(u->info); - const char* address = ip_convert_to_string(&u->ipaddr); + const char* address = ip_convert_to_string(&u->net.ipaddr); struct user* user = 0; adc_msg_remove_named_argument(cmd, ADC_INF_FLAG_IPV4_ADDR); diff --git a/src/user.c b/src/user.c index cdfbae8..b48f32c 100644 --- a/src/user.c +++ b/src/user.c @@ -30,71 +30,51 @@ struct user* user_create(struct hub_info* hub, int sd) if (user == NULL) return NULL; /* OOM */ - user->ev_write = hub_malloc_zero(sizeof(struct event)); - user->ev_read = hub_malloc_zero(sizeof(struct event)); + user->net.ev_write = hub_malloc_zero(sizeof(struct event)); + user->net.ev_read = hub_malloc_zero(sizeof(struct event)); - if (!user->ev_write || !user->ev_read) + if (!user->net.ev_write || !user->net.ev_read) { - hub_free(user->ev_read); - hub_free(user->ev_write); + hub_free(user->net.ev_read); + hub_free(user->net.ev_write); hub_free(user); return NULL; } - user->sd = sd; - user->tm_connected = time(NULL); - - user->send_queue = list_create(); - - user->send_buf = hub_iobuf_create(MAX_SEND_BUF); - user->recv_buf = hub_iobuf_create(MAX_RECV_BUF); + user->net.sd = sd; + user->net.tm_connected = time(NULL); + user->net.send_queue = hub_sendq_create(); + user->net.recv_queue = hub_recvq_create(); user_set_state(user, state_protocol); return user; } -static void clear_send_queue_callback(void* ptr) -{ - adc_msg_free((struct adc_message*) ptr); -} void user_destroy(struct user* user) { hub_log(log_trace, "user_destroy(), user=%p", user); - if (user->ev_write) + if (user->net.ev_write) { - event_del(user->ev_write); - hub_free(user->ev_write); - user->ev_write = 0; + event_del(user->net.ev_write); + hub_free(user->net.ev_write); + user->net.ev_write = 0; } - if (user->ev_read) + if (user->net.ev_read) { - event_del(user->ev_read); - hub_free(user->ev_read); - user->ev_read = 0; + event_del(user->net.ev_read); + hub_free(user->net.ev_read); + user->net.ev_read = 0; } - net_close(user->sd); + hub_recvq_destroy(user->net.recv_queue); + hub_sendq_destroy(user->net.send_queue); + net_close(user->net.sd); adc_msg_free(user->info); user_clear_feature_cast_support(user); - - if (user->recv_buf) - { - hub_free(user->recv_buf); - } - - if (user->send_queue) - { - list_clear(user->send_queue, &clear_send_queue_callback); - list_destroy(user->send_queue); - } - - user->send_buf = hub_iobuf_create(MAX_SEND_BUF); - user->recv_buf = hub_iobuf_create(MAX_RECV_BUF); - hub_free(user); } @@ -337,5 +317,21 @@ int user_is_registered(struct user* user) return 0; } +void user_want_write(struct user* user) +{ + if (user && user->net.ev_write) + { + event_add(user->net.ev_write, 0); + } +} + +void user_want_read(struct user* user, int timeout_s) +{ + struct timeval timeout = { timeout_s, 0 }; + if (user && user->net.ev_read) + { + event_add(user->net.ev_read, &timeout); + } +} diff --git a/src/user.h b/src/user.h index 4a0b9ab..53709c9 100644 --- a/src/user.h +++ b/src/user.h @@ -33,7 +33,6 @@ enum user_state state_disconnected = 5, /**<< "User is disconnected" */ }; - enum user_flags { feature_base = 0x00000001, /** BASE: Basic configuration (required by all clients) */ @@ -54,7 +53,6 @@ enum user_flags flag_nat = 0x40000000, /** nat override enabled */ }; - enum user_quit_reason { quit_unknown = 0, @@ -71,7 +69,6 @@ enum user_quit_reason quit_ghost_timeout = 11, /** The user is a ghost, and trying to login from another connection */ }; - struct user_info { sid_t sid; /** session ID */ @@ -84,7 +81,7 @@ struct user_info * as the number of bytes and files shared, and the number of hubs the * user is connected to, etc. */ -struct user_counts +struct user_limits { uint64_t shared_size; /** Shared size in bytes */ size_t shared_files; /** The number of shared files */ @@ -95,36 +92,40 @@ struct user_counts size_t hub_count_total; /** The number of hubs connected to in total */ }; -struct user +struct user_net_io { int sd; /** socket descriptor */ struct event* ev_read; /** libevent struct for read events */ struct event* ev_write; /** libevent struct for write events */ + + struct hub_recvq* recv_queue; + struct hub_sendq* send_queue; + + time_t tm_connected; /** time when user connected */ + time_t tm_last_read; /** time the user last received something from the hub */ + time_t tm_last_write; /** time the user last sent something to the hub */ + + struct ip_addr_encap ipaddr; /** IP address of connected user */ + +#ifdef SSL_SUPPORT + SSL* ssl; /** SSL handle */ +#endif /* SSL_SUPPORT */ +}; + +struct user +{ + struct user_net_io net; /** Network information data */ enum user_state state; /** see enum user_state */ enum user_credentials credentials; /** see enum user_credentials */ struct user_info id; /** Contains nick name and CID */ int flags; /** see enum user_features */ char user_agent[MAX_UA_LEN+1];/** User agent string */ - time_t tm_connected; /** time when user connected */ - time_t tm_last_read; /** time the user last received something from the hub */ - time_t tm_last_write; /** time the user last sent something to the hub */ struct linked_list* feature_cast; /** Features supported by feature cast */ struct adc_message* info; /** ADC 'INF' message (broadcasted to everyone joining the hub) */ - - struct hub_iobuf* send_buf; - struct hub_iobuf* recv_buf; - - size_t send_queue_offset; /** Send queue byte offset */ - struct linked_list* send_queue; /** Send queue */ - int send_queue_size; /** Size of send queue (in bytes, not messages) */ - struct hub_info* hub; /** The hub instance this user belong to */ + struct user_limits limits; /** Data used for limitation */ int quit_reason; /** Quit reason (see user_quit_reason) */ - struct ip_addr_encap ipaddr; /** IP address of connected user */ - struct user_counts limits; /** Data used for limitation */ -#ifdef SSL_SUPPORT - SSL* ssl; /** SSL handle */ -#endif /* SSL_SUPPORT */ + }; @@ -272,6 +273,16 @@ extern int user_set_feature_cast_support(struct user* u, char feature[4]); */ extern void user_clear_feature_cast_support(struct user* u); +/** + * Mark the user with a want-write flag, meaning it should poll for writability. + */ +extern void user_want_write(struct user* user); + +/** + * Mark the user with a want read flag, meaning it should poll for readability. + */ +extern void user_want_read(struct user* user, int timeout_s); + #endif /* HAVE_UHUB_USER_H */ diff --git a/src/usermanager.c b/src/usermanager.c index fb6210a..5f0ab00 100644 --- a/src/usermanager.c +++ b/src/usermanager.c @@ -234,10 +234,13 @@ int uman_send_user_list(struct hub_info* hub, struct user* target) user = (struct user*) list_get_next(hub->users->list); } +#if 0 + FIXME: FIXME FIXME handle send queue excess if (!target->send_queue_size) { user_flag_unset(target, flag_user_list); } +#endif return ret; }