Ensure we set a max recv and send buffer per user, and that it does not

exceed the one used internally by the application.
This commit is contained in:
Jan Vidar Krey 2009-06-22 19:50:10 +02:00
parent 57fd872f14
commit 1d9acece34
2 changed files with 74 additions and 66 deletions

View File

@ -34,7 +34,7 @@ int hub_handle_message(struct hub_info* hub, struct user* u, const char* line, s
if (user_is_disconnecting(u)) if (user_is_disconnecting(u))
return -1; return -1;
cmd = adc_msg_parse_verify(u, line, length); cmd = adc_msg_parse_verify(u, line, length);
if (cmd) if (cmd)
{ {

View File

@ -160,64 +160,58 @@ void net_on_read(int fd, short ev, void *arg)
buf_size = hub_recvq_get(q, buf, MAX_RECV_BUF); buf_size = hub_recvq_get(q, buf, MAX_RECV_BUF);
for (;;) int size = net_user_recv(user, &buf[buf_size], MAX_RECV_BUF - buf_size);
if (size > 0)
{ {
int size = net_user_recv(user, &buf[buf_size], MAX_RECV_BUF - buf_size); buf_size += size;
if (size > 0) }
if (size == -1)
{
if (net_error() != EWOULDBLOCK)
flag_close = quit_socket_error;
}
else if (size == 0)
{
flag_close = quit_disconnected;
}
else
{
size_t offset = 0;
size_t length;
char* start = buf;
char* pos = 0;
while ((pos = memchr(start, '\n', (buf_size - offset))))
{ {
buf_size += size; char* line = start;
length = pos - start;
pos[0] = '\0';
#ifdef DEBUG_SENDQ
printf("PROC: \"%s\" (%d)\n", line, (int) length);
#endif
if (hub_handle_message(g_hub, user, line, length) == -1)
{
flag_close = quit_protocol_error;
break;
}
start = pos;
start++;
offset += length;
} }
if (size == -1) if (start < buf + buf_size)
{ {
if (net_error() != EWOULDBLOCK) hub_recvq_set(q, buf+offset, buf_size);
flag_close = quit_socket_error;
break;
}
else if (size == 0)
{
flag_close = quit_disconnected;
break;
} }
else else
{ {
size_t offset = 0; hub_recvq_set(q, 0, 0);
size_t length;
char* start = buf;
char* pos = 0;
while ((pos = memchr(start, '\n', (buf_size - offset))))
{
char* line = start;
length = pos - start;
pos[0] = '\0';
#ifdef DEBUG_SENDQ
printf("PROC: \"%s\" (%d)\n", line, (int) length);
#endif
if (hub_handle_message(g_hub, user, line, length) == -1)
{
flag_close = quit_protocol_error;
break;
}
start = pos;
start++;
offset += length;
}
if (start < buf + buf_size)
{
hub_recvq_set(q, buf+offset, buf_size);
}
else
{
hub_recvq_set(q, 0, 0);
}
} }
} }
if (flag_close) if (flag_close)
{ {
hub_disconnect_user(g_hub, user, flag_close); hub_disconnect_user(g_hub, user, flag_close);
@ -257,19 +251,41 @@ void net_on_write(int fd, short ev, void *arg)
break; break;
} }
#if 0
if (close_flag)
{
hub_disconnect_user(g_hub, user, close_flag);
}
else
#endif
if (hub_sendq_get_bytes(user->net.send_queue)) if (hub_sendq_get_bytes(user->net.send_queue))
{ {
user_net_io_want_write(user); user_net_io_want_write(user);
} }
} }
static void prepare_user_net(struct hub_info* hub, struct user* user)
{
int fd = user->net.sd;
struct timeval timeout = { TIMEOUT_CONNECTED, 0 };
size_t sendbuf = 0;
size_t recvbuf = 0;
net_set_nonblocking(fd, 1);
net_set_nosigpipe(fd, 1);
if (net_get_recvbuf_size(fd, &recvbuf) != -1)
{
if (recvbuf > MAX_RECV_BUF || !recvbuf) recvbuf = MAX_RECV_BUF;
net_set_recvbuf_size(fd, recvbuf);
}
if (net_get_sendbuf_size(fd, &sendbuf) != -1)
{
if (sendbuf > MAX_SEND_BUF || !sendbuf) sendbuf = MAX_SEND_BUF;
net_set_sendbuf_size(fd, sendbuf);
}
event_set(user->net.ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
event_set(user->net.ev_write, fd, EV_WRITE, net_on_write, user);
event_base_set(hub->evbase, user->net.ev_read);
event_base_set(hub->evbase, user->net.ev_write);
event_add(user->net.ev_read, &timeout);
}
void net_on_accept(int server_fd, short ev, void *arg) void net_on_accept(int server_fd, short ev, void *arg)
{ {
@ -277,7 +293,6 @@ void net_on_accept(int server_fd, short ev, void *arg)
struct user* user = 0; struct user* user = 0;
struct ip_addr_encap ipaddr; struct ip_addr_encap ipaddr;
const char* addr; const char* addr;
struct timeval timeout = { TIMEOUT_CONNECTED, 0 };
for (;;) for (;;)
{ {
@ -318,15 +333,8 @@ void net_on_accept(int server_fd, short ev, void *arg)
/* Store IP address in user object */ /* Store IP address in user object */
memcpy(&user->net.ipaddr, &ipaddr, sizeof(ipaddr)); memcpy(&user->net.ipaddr, &ipaddr, sizeof(ipaddr));
net_set_nonblocking(fd, 1); prepare_user_net(hub, user);
net_set_nosigpipe(fd, 1);
event_set(user->net.ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
event_set(user->net.ev_write, fd, EV_WRITE, net_on_write, user);
event_base_set(hub->evbase, user->net.ev_read);
event_base_set(hub->evbase, user->net.ev_write);
event_add(user->net.ev_read, &timeout);
} }
} }