diff --git a/src/core/ioqueue.c b/src/core/ioqueue.c index 6972c71..ab55fb0 100644 --- a/src/core/ioqueue.c +++ b/src/core/ioqueue.c @@ -141,13 +141,13 @@ void ioq_send_remove(struct ioq_send* q, struct adc_message* msg) q->offset = 0; } -int ioq_send_send(struct ioq_send* q, struct hub_user* user) +int ioq_send_send(struct ioq_send* q, struct net_connection* con) { int ret; struct adc_message* msg = list_get_first(q->queue); if (!msg) return 0; uhub_assert(msg->cache && *msg->cache); - ret = net_con_send(user->connection, msg->cache + q->offset, msg->length - q->offset); + ret = net_con_send(con, msg->cache + q->offset, msg->length - q->offset); if (ret > 0) { diff --git a/src/core/ioqueue.h b/src/core/ioqueue.h index 3785950..2624b24 100644 --- a/src/core/ioqueue.h +++ b/src/core/ioqueue.h @@ -60,7 +60,7 @@ extern void ioq_send_add(struct ioq_send*, struct adc_message* msg); * Process the send queue, and send as many messages as possible. * @returns -1 on error, 0 if unable to send more, 1 if more can be sent. */ -extern int ioq_send_send(struct ioq_send*, struct hub_user*); +extern int ioq_send_send(struct ioq_send*, struct net_connection* con); /** * @returns 1 if send queue is empty, 0 otherwise. diff --git a/src/core/netevent.c b/src/core/netevent.c index cdaeea8..e2c4834 100644 --- a/src/core/netevent.c +++ b/src/core/netevent.c @@ -109,7 +109,7 @@ int handle_net_write(struct hub_user* user) int ret = 0; while (ioq_send_get_bytes(user->send_queue)) { - ret = ioq_send_send(user->send_queue, user); + ret = ioq_send_send(user->send_queue, user->connection); if (ret <= 0) break; } diff --git a/src/tools/adcclient.c b/src/tools/adcclient.c index e312872..b0974a2 100644 --- a/src/tools/adcclient.c +++ b/src/tools/adcclient.c @@ -1,6 +1,6 @@ /* * uhub - A tiny ADC p2p connection hub - * Copyright (C) 2007-2011, Jan Vidar Krey + * Copyright (C) 2007-2012, Jan Vidar Krey * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -23,9 +23,53 @@ #define ADC_CID_SIZE 39 #define BIG_BUFSIZE 32768 #define TIGERSIZE 24 +#define MAX_RECV_BUFFER 65536 // #define ADCC_DEBUG // #define ADC_CLIENT_DEBUG_PROTO +enum ADC_client_state +{ + ps_none, /* Not connected */ + ps_conn, /* Connecting... */ + ps_conn_ssl, /* SSL handshake */ + ps_protocol, /* Have sent HSUP */ + ps_identify, /* Have sent BINF */ + ps_verify, /* Have sent HPAS */ + ps_normal, /* Are fully logged in */ +}; + +enum ADC_client_flags +{ + cflag_none = 0, + cflag_ssl = 1, + cflag_choke = 2, + cflag_pipe = 4, +}; + +struct ADC_client +{ + sid_t sid; + enum ADC_client_state state; + struct adc_message* info; + struct ioq_recv* recv_queue; + struct ioq_send* send_queue; + adc_client_cb callback; + size_t s_offset; + size_t r_offset; + size_t timeout; + struct net_connection* con; + struct net_timer* timer; + struct sockaddr_in addr; + char* hub_address; + char* nick; + char* desc; + int flags; +#ifdef SSL_SUPPORT + const SSL_METHOD* ssl_method; + SSL_CTX* ssl_ctx; +#endif /* SSL_SUPPORT */ +}; + static ssize_t ADC_client_recv(struct ADC_client* client); static void ADC_client_send_info(struct ADC_client* client); @@ -36,7 +80,9 @@ static void ADC_client_on_connected_ssl(struct ADC_client* client); static void ADC_client_on_disconnected(struct ADC_client* client); static void ADC_client_on_login(struct ADC_client* client); static int ADC_client_parse_address(struct ADC_client* client, const char* arg); -static void ADC_client_on_recv_line(struct ADC_client* client, const char* line, size_t length); +static int ADC_client_on_recv_line(struct ADC_client* client, const char* line, size_t length); +static void ADC_client_send(struct ADC_client* client, struct adc_message* msg); +static int ADC_client_send_queue(struct ADC_client* client); static void ADC_client_debug(struct ADC_client* client, const char* format, ...) { @@ -146,7 +192,7 @@ static void event_callback(struct net_connection* con, int events, void *arg) if (events & NET_EVENT_WRITE) { - /* FIXME: Call send again */ + ADC_client_send_queue(client); } } } @@ -184,7 +230,7 @@ static void event_callback(struct net_connection* con, int events, void *arg) } while (0) -static void ADC_client_on_recv_line(struct ADC_client* client, const char* line, size_t length) +static int ADC_client_on_recv_line(struct ADC_client* client, const char* line, size_t length) { struct ADC_chat_message chat; struct ADC_client_callback_data data; @@ -199,13 +245,13 @@ static void ADC_client_on_recv_line(struct ADC_client* client, const char* line, if (!msg) { ADC_client_debug(client, "WARNING: Message cannot be decoded: \"%s\"", line); - return; + return -1; } if (length < 4) { ADC_client_debug(client, "Unexpected response from hub: '%s'", line); - return; + return -1; } switch (msg->cmd) @@ -325,68 +371,126 @@ static void ADC_client_on_recv_line(struct ADC_client* client, const char* line, } adc_msg_free(msg); + return 0; } static ssize_t ADC_client_recv(struct ADC_client* client) { + static char buf[BIG_BUFSIZE]; + struct ioq_recv* q = client->recv_queue; + size_t buf_size = ioq_recv_get(q, buf, BIG_BUFSIZE); + ssize_t size; + ADC_TRACE; - ssize_t size = net_con_recv(client->con, &client->recvbuf[client->r_offset], ADC_BUFSIZE - client->r_offset); - if (size <= 0) - return size; - client->r_offset += size; - client->recvbuf[client->r_offset] = 0; + if (client->flags & cflag_choke) + buf_size = 0; + size = net_con_recv(client->con, buf + buf_size, BIG_BUFSIZE - buf_size); - char* start = client->recvbuf; - char* pos; - char* lastPos = 0; - size_t remaining = client->r_offset; + if (size > 0) + buf_size += size; - while ((pos = memchr(start, '\n', remaining))) + if (size < 0) + return -1; + else if (size == 0) + return 0; + else { - pos[0] = 0; + char* lastPos = 0; + char* start = buf; + char* pos = 0; + size_t remaining = buf_size; - ADC_client_on_recv_line(client, start, pos - start); + while ((pos = memchr(start, '\n', remaining))) + { + lastPos = pos; + pos[0] = '\0'; - pos++; - remaining -= (pos - start); - start = pos; - lastPos = pos; +#ifdef DEBUG_SENDQ + LOG_DUMP("PROC: \"%s\" (%d)\n", start, (int) (pos - start)); +#endif + + if (client->flags & cflag_choke) + client->flags &= ~cflag_choke; + else + { + if (((pos - start) > 0) && MAX_RECV_BUFFER > (pos - start)) + { + if (ADC_client_on_recv_line(client, start, pos - start) == -1) + return -1; + } + } + + pos[0] = '\n'; /* FIXME: not needed */ + pos ++; + remaining -= (pos - start); + start = pos; + } + + if (lastPos || remaining) + { + if (remaining < (size_t) MAX_RECV_BUFFER) + { + ioq_recv_set(q, lastPos ? lastPos : buf, remaining); + } + else + { + ioq_recv_set(q, 0, 0); + client->flags |= cflag_choke; + LOG_WARN("Received message past MAX_RECV_BUFFER (%d), dropping message.", MAX_RECV_BUFFER); + } + } + else + { + ioq_recv_set(q, 0, 0); + } + } + return 0; +} + +static int ADC_client_send_queue(struct ADC_client* client) +{ + int ret = 0; + while (ioq_send_get_bytes(client->send_queue)) + { + ret = ioq_send_send(client->send_queue, client->con); + if (ret <= 0) + break; } - if (lastPos) + if (ret < 0) + return quit_socket_error; + + if (ioq_send_get_bytes(client->send_queue)) { - memmove(client->recvbuf, lastPos, remaining); - client->r_offset = remaining; + net_con_update(client->con, NET_EVENT_READ | NET_EVENT_WRITE); + } + else + { + net_con_update(client->con, NET_EVENT_READ); } return 0; } -void ADC_client_send(struct ADC_client* client, char* msg) +void ADC_client_send(struct ADC_client* client, struct adc_message* msg) { ADC_TRACE; - int ret = net_con_send(client->con, msg, strlen(msg)); -#ifdef ADC_CLIENT_DEBUG_PROTO - char* dump = strdup(msg); - dump[strlen(msg) - 1] = 0; - ADC_client_debug(client, "- SEND: '%s'", dump); - free(dump); -#endif + uhub_assert(client->con != NULL); + uhub_assert(msg->cache && *msg->cache); - if (ret != strlen(msg)) + if (ioq_send_is_empty(client->send_queue) && !(client->flags & cflag_pipe)) { - if (ret == -1) - { - if (net_error() != EWOULDBLOCK) - ADC_client_on_disconnected(client); - } - else - { - /* FIXME: Not all data sent! */ - printf("ret (%d) != msg->length (%d)\n", ret, (int) strlen(msg)); - } + /* Perform oportunistic write */ + ioq_send_add(client->send_queue, msg); + ADC_client_send_queue(client); + } + else + { + ioq_send_add(client->send_queue, msg); + if (!(client->flags & cflag_pipe)) + net_con_update(client->con, NET_EVENT_READ | NET_EVENT_WRITE); } } @@ -405,16 +509,17 @@ void ADC_client_send_info(struct ADC_client* client) adc_msg_add_named_argument_string(client->info, ADC_INF_FLAG_USER_AGENT, PRODUCT "/" VERSION); adc_cid_pid(client); - ADC_client_send(client, client->info->cache); + ADC_client_send(client, client->info); } -int ADC_client_create(struct ADC_client* client, const char* nickname, const char* description) + +struct ADC_client* ADC_client_create(const char* nickname, const char* description) { ADC_TRACE; - memset(client, 0, sizeof(struct ADC_client)); + struct ADC_client* client = (struct ADC_client*) hub_malloc_zero(sizeof(struct ADC_client)); int sd = net_socket_create(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sd == -1) return -1; + if (sd == -1) return NULL; client->con = net_con_create(); #if 0 @@ -431,7 +536,10 @@ int ADC_client_create(struct ADC_client* client, const char* nickname, const cha client->nick = hub_strdup(nickname); client->desc = hub_strdup(description); - return 0; + client->send_queue = ioq_send_create(); + client->recv_queue = ioq_recv_create(); + + return client; } void ADC_client_destroy(struct ADC_client* client) @@ -482,7 +590,7 @@ static void ADC_client_on_connected(struct ADC_client* client) { ADC_TRACE; #ifdef SSL_SUPPORT - if (client->ssl_enabled) + if (client->flags & cflag_ssl) { net_con_update(client->con, NET_EVENT_READ | NET_EVENT_WRITE); client->callback(client, ADC_CLIENT_SSL_HANDSHAKE, 0); @@ -493,7 +601,7 @@ static void ADC_client_on_connected(struct ADC_client* client) { net_con_update(client->con, NET_EVENT_READ); client->callback(client, ADC_CLIENT_CONNECTED, 0); - ADC_client_send(client, ADC_HANDSHAKE); + ADC_client_send(client, adc_msg_create(ADC_HANDSHAKE)); ADC_client_set_state(client, ps_protocol); } } @@ -553,10 +661,12 @@ static int ADC_client_parse_address(struct ADC_client* client, const char* arg) /* Check for ADC or ADCS */ if (!strncmp(arg, "adc://", 6)) - client->ssl_enabled = 0; + { + client->flags &= ~cflag_ssl; + } else if (!strncmp(arg, "adcs://", 7)) { - client->ssl_enabled = 1; + client->flags |= cflag_ssl; ssl = 1; } else diff --git a/src/tools/adcclient.h b/src/tools/adcclient.h index 2134770..ed5dc75 100644 --- a/src/tools/adcclient.h +++ b/src/tools/adcclient.h @@ -1,6 +1,6 @@ /* * uhub - A tiny ADC p2p connection hub - * Copyright (C) 2007-2011, Jan Vidar Krey + * Copyright (C) 2007-2012, Jan Vidar Krey * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -24,17 +24,6 @@ #define ADC_BUFSIZE 16384 -enum ADC_client_state -{ - ps_none, /* Not connected */ - ps_conn, /* Connecting... */ - ps_conn_ssl, /* SSL handshake */ - ps_protocol, /* Have sent HSUP */ - ps_identify, /* Have sent BINF */ - ps_verify, /* Have sent HPAS */ - ps_normal, /* Are fully logged in */ -}; - struct ADC_client; enum ADC_client_callback_type @@ -118,36 +107,11 @@ struct ADC_client_callback_data typedef int (*adc_client_cb)(struct ADC_client*, enum ADC_client_callback_type, struct ADC_client_callback_data* data); -struct ADC_client -{ - sid_t sid; - enum ADC_client_state state; - struct adc_message* info; - char recvbuf[ADC_BUFSIZE]; - char sendbuf[ADC_BUFSIZE]; - adc_client_cb callback; - size_t s_offset; - size_t r_offset; - size_t timeout; - struct net_connection* con; - struct net_timer* timer; - struct sockaddr_in addr; - char* hub_address; - char* nick; - char* desc; - int ssl_enabled; -#ifdef SSL_SUPPORT - const SSL_METHOD* ssl_method; - SSL_CTX* ssl_ctx; -#endif /* SSL_SUPPORT */ -}; - -int ADC_client_create(struct ADC_client* client, const char* nickname, const char* description); +struct ADC_client* ADC_client_create(const char* nickname, const char* description); void ADC_client_set_callback(struct ADC_client* client, adc_client_cb); void ADC_client_destroy(struct ADC_client* client); int ADC_client_connect(struct ADC_client* client, const char* address); void ADC_client_disconnect(struct ADC_client* client); -void ADC_client_send(struct ADC_client* client, char* msg); #endif /* HAVE_UHUB_ADC_CLIENT_H */ diff --git a/src/tools/admin.c b/src/tools/admin.c index 8b67c86..47b77fa 100644 --- a/src/tools/admin.c +++ b/src/tools/admin.c @@ -163,18 +163,18 @@ int main(int argc, char** argv) return 1; } - struct ADC_client client; + struct ADC_client* client; net_initialize(); memset(g_usermap, 0, sizeof(g_usermap)); - ADC_client_create(&client, "uhub-admin", "stresstester"); - ADC_client_set_callback(&client, handle); - ADC_client_connect(&client, argv[1]); + client = ADC_client_create("uhub-admin", "stresstester"); + ADC_client_set_callback(client, handle); + ADC_client_connect(client, argv[1]); while (running && net_backend_process()) { } - ADC_client_destroy(&client); + ADC_client_destroy(client); net_destroy(); return 0; }