Started working on new pipelines for sending and receiving data.

This will be useful for the next step; SSL.

(NOTE: This code is very chatty about debug messages)
This commit is contained in:
Jan Vidar Krey 2009-05-19 22:57:50 +02:00
parent 9a3a5bc2de
commit 9309c925d3
8 changed files with 218 additions and 94 deletions

View File

@ -133,8 +133,9 @@ libuhub_SOURCES := \
src/commands.c \ src/commands.c \
src/config.c \ src/config.c \
src/eventqueue.c \ src/eventqueue.c \
src/hubevent.c \
src/hub.c \ src/hub.c \
src/hubevent.c \
src/hubio.c \
src/inf.c \ src/inf.c \
src/ipcalc.c \ src/ipcalc.c \
src/list.c \ src/list.c \
@ -161,8 +162,9 @@ uhub_HEADERS := \
src/config.h \ src/config.h \
src/eventid.h \ src/eventid.h \
src/eventqueue.h \ src/eventqueue.h \
src/hubevent.h \
src/hub.h \ src/hub.h \
src/hubevent.h \
src/hubio.h \
src/inf.h \ src/inf.h \
src/ipcalc.h \ src/ipcalc.h \
src/list.h \ src/list.h \

91
src/hubio.c Normal file
View File

@ -0,0 +1,91 @@
/*
* uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2009, Jan Vidar Krey
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include "uhub.h"
#include "hubio.h"
struct hub_iobuf* hub_iobuf_create(size_t max_size)
{
struct hub_iobuf* buf = hub_malloc_zero(sizeof(struct hub_iobuf));
if (buf)
{
buf->buf = hub_malloc(max_size);
buf->capacity = max_size;
}
return buf;
}
void hub_iobuf_destroy(struct hub_iobuf* buf)
{
if (buf)
{
hub_free(buf->buf);
hub_free(buf);
}
}
int hub_iobuf_recv(struct hub_iobuf* buf, hub_iobuf_read r, void* data)
{
int size = r(data, &buf->buf[buf->offset], buf->capacity - buf->offset);
if (size > 0)
{
buf->size += size;
}
return size;
}
int hub_iobuf_send(struct hub_iobuf* buf, hub_iobuf_write w, void* data)
{
int size = w(data, &buf->buf[buf->offset], buf->size - buf->offset);
if (size > 0)
{
buf->offset += size;
}
return size;
}
char* hub_iobuf_getline(struct hub_iobuf* buf, size_t* offset, size_t* len, size_t max_size)
{
size_t x = *offset;
char* pos = memchr(&buf->buf[x], '\n', (buf->size - x));
if (pos)
{
*len = &pos[0] - &buf->buf[x];
pos[0] = '\0';
pos = &buf->buf[x];
(*offset) += (*len + 1);
}
return pos;
}
void hub_iobuf_remove(struct hub_iobuf* buf, size_t n)
{
assert(buf);
assert(n <= buf->size);
buf->offset = 0;
if (n == buf->size)
{
buf->size = 0;
}
}

71
src/hubio.h Normal file
View File

@ -0,0 +1,71 @@
/*
* uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2009, Jan Vidar Krey
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef HAVE_UHUB_HUB_IO_H
#define HAVE_UHUB_HUB_IO_H
/*
* Used as a basis for receive queue, and send queue.
*/
struct hub_iobuf
{
char* buf;
size_t offset;
size_t size;
size_t capacity;
};
typedef int (*hub_iobuf_write)(void* desc, const void* buf, size_t len);
typedef int (*hub_iobuf_read)(void* desc, void* buf, size_t len);
/**
* Create and initialize a io buffer
*/
extern struct hub_iobuf* hub_iobuf_create(size_t max_size);
/**
* Destroy an io buffer.
*/
extern void hub_iobuf_destroy(struct hub_iobuf*);
/**
* net_read() from a socket descriptor into a buffer.
* @return value from net_recv()
*/
extern int hub_iobuf_recv(struct hub_iobuf*, hub_iobuf_read, void* data);
/**
* net_send() data from a buffer to a socket descriptor.
* @return value from net_send()
*/
extern int hub_iobuf_send(struct hub_iobuf*, hub_iobuf_write, void* data);
/**
* Get a line from the buffer
*/
extern char* hub_iobuf_getline(struct hub_iobuf*, size_t* offset, size_t* length, size_t max_size);
/**
* Removes the first 'n' bytes from the buffer.
* This will reset the offset and size parameters.
*/
extern void hub_iobuf_remove(struct hub_iobuf* buf, size_t n);
#endif /* HAVE_UHUB_HUB_IO_H */

View File

@ -18,18 +18,40 @@
*/ */
#include "uhub.h" #include "uhub.h"
#include "hubio.h"
/* FIXME: This should not be needed! */ /* FIXME: This should not be needed! */
extern struct hub_info* g_hub; extern struct hub_info* g_hub;
int net_user_send(void* ptr, const void* buf, size_t len)
{
struct user* user = (struct user*) ptr;
int ret = net_send(user->sd, buf, len, UHUB_SEND_SIGNAL);
printf("net_user_send: %d/%d bytes\n", ret, (int) len);
if (ret == -1)
{
printf(" errno: %d - %s\n", errno, strerror(errno));
}
return ret;
}
int net_user_recv(void* ptr, void* buf, size_t len)
{
struct user* user = (struct user*) ptr;
int ret = net_recv(user->sd, buf, len, 0);
printf("net_user_recv: %d/%d bytes\n", ret, (int) len);
if (ret == -1)
{
printf(" errno: %d - %s\n", errno, strerror(errno));
}
return ret;
}
void net_on_read(int fd, short ev, void *arg) void net_on_read(int fd, short ev, void *arg)
{ {
static char buf[MAX_RECV_BUF];
struct user* user = (struct user*) arg; struct user* user = (struct user*) arg;
char* pos;
size_t offset;
size_t buflen;
ssize_t size;
int more = 1; int more = 1;
int flag_close = 0; int flag_close = 0;
@ -49,16 +71,9 @@ void net_on_read(int fd, short ev, void *arg)
} }
} }
while (more) for (;;)
{ {
offset = 0; ssize_t size = hub_iobuf_recv(user->recv_buf, net_user_recv, user);
if (user->recv_buf)
{
memcpy(buf, user->recv_buf, user->recv_buf_offset);
offset = user->recv_buf_offset;
}
size = net_recv(fd, &buf[offset], MAX_RECV_BUF - offset, 0);
if (size == -1) if (size == -1)
{ {
if (net_error() != EWOULDBLOCK) if (net_error() != EWOULDBLOCK)
@ -72,76 +87,19 @@ void net_on_read(int fd, short ev, void *arg)
} }
else else
{ {
buflen = offset + size; size_t offset = 0;
ssize_t handled = 0; size_t length;
char* line = 0;
while ((pos = memchr(&buf[handled], '\n', (buflen - handled))))
while ((line = hub_iobuf_getline(user->recv_buf, &offset, &length, g_hub->config->max_recv_buffer)))
{ {
pos[0] = '\0'; if (hub_handle_message(g_hub, user, line, length) == -1)
size_t msglen = &pos[0] - &buf[handled];
if (user_flag_get(user, flag_maxbuf))
{ {
user_flag_unset(user, flag_maxbuf); flag_close = quit_protocol_error;
} break;
else
{
if (msglen < g_hub->config->max_recv_buffer)
{
// FIXME: hub is not set????
if (hub_handle_message(g_hub, user, &buf[handled], msglen) == -1)
{
flag_close = quit_protocol_error;
more = 0;
break;
}
}
}
handled += msglen;
handled++;
}
if (handled == 0 && user_flag_get(user, flag_maxbuf))
handled = buflen;
if (!more)
break;
if (handled < buflen)
{
if ((buflen - handled) > g_hub->config->max_recv_buffer)
{
user_flag_set(user, flag_maxbuf);
hub_free(user->recv_buf);
user->recv_buf = 0;
user->recv_buf_offset = 0;
}
else
{
if (!user->recv_buf)
user->recv_buf = hub_malloc(g_hub->config->max_recv_buffer);
if (user->recv_buf)
{
memcpy(user->recv_buf, &buf[handled], buflen - handled);
user->recv_buf_offset = buflen - handled;
}
else
{
flag_close = quit_memory_error;
break;
}
}
}
else
{
if (user->recv_buf)
{
hub_free(user->recv_buf);
user->recv_buf = 0;
user->recv_buf_offset = 0;
} }
} }
hub_iobuf_remove(user->recv_buf, offset);
} }
} }

View File

@ -19,7 +19,6 @@
#include "uhub.h" #include "uhub.h"
int route_message(struct hub_info* hub, struct user* u, struct adc_message* msg) int route_message(struct hub_info* hub, struct user* u, struct adc_message* msg)
{ {
struct user* target = NULL; struct user* target = NULL;

View File

@ -122,6 +122,7 @@
#define TIGERSIZE 24 #define TIGERSIZE 24
#define MAX_RECV_BUF 65535 #define MAX_RECV_BUF 65535
#define MAX_SEND_BUF 65535
#ifndef INET6_ADDRSTRLEN #ifndef INET6_ADDRSTRLEN
#define INET6_ADDRSTRLEN 46 #define INET6_ADDRSTRLEN 46
@ -145,6 +146,7 @@ extern "C" {
#include "sid.h" #include "sid.h"
#include "network.h" #include "network.h"
#include "netevent.h" #include "netevent.h"
#include "hubio.h"
#include "auth.h" #include "auth.h"
#include "tiger.h" #include "tiger.h"
#include "config.h" #include "config.h"

View File

@ -43,14 +43,11 @@ struct user* user_create(struct hub_info* hub, int sd)
user->sd = sd; user->sd = sd;
user->tm_connected = time(NULL); user->tm_connected = time(NULL);
// user->hub = hub;
user->feature_cast = 0;
user->send_queue = list_create(); user->send_queue = list_create();
user->send_queue_offset = 0;
user->send_queue_size = 0; user->send_buf = hub_iobuf_create(MAX_SEND_BUF);
user->recv_buf_offset = 0; user->recv_buf = hub_iobuf_create(MAX_RECV_BUF);
user->recv_buf = 0;
user_set_state(user, state_protocol); user_set_state(user, state_protocol);
return user; return user;
@ -95,6 +92,9 @@ void user_destroy(struct user* user)
list_destroy(user->send_queue); list_destroy(user->send_queue);
} }
user->send_buf = hub_iobuf_create(MAX_SEND_BUF);
user->recv_buf = hub_iobuf_create(MAX_RECV_BUF);
hub_free(user); hub_free(user);
} }

View File

@ -20,9 +20,8 @@
#ifndef HAVE_UHUB_USER_H #ifndef HAVE_UHUB_USER_H
#define HAVE_UHUB_USER_H #define HAVE_UHUB_USER_H
struct hub_info; struct hub_info;
struct hub_iobuf;
enum user_state enum user_state
{ {
@ -111,12 +110,14 @@ struct user
time_t tm_last_write; /** time the user last sent something to the hub */ time_t tm_last_write; /** time the user last sent something to the hub */
struct linked_list* feature_cast; /** Features supported by feature cast */ struct linked_list* feature_cast; /** Features supported by feature cast */
struct adc_message* info; /** ADC 'INF' message (broadcasted to everyone joining the hub) */ struct adc_message* info; /** ADC 'INF' message (broadcasted to everyone joining the hub) */
struct hub_iobuf* send_buf;
struct hub_iobuf* recv_buf;
size_t send_queue_offset; /** Send queue byte offset */ size_t send_queue_offset; /** Send queue byte offset */
struct linked_list* send_queue; /** Send queue */ struct linked_list* send_queue; /** Send queue */
int send_queue_size; /** Size of send queue (in bytes, not messages) */ int send_queue_size; /** Size of send queue (in bytes, not messages) */
int send_queue_esize; /** Effective send queue size */
char* recv_buf; /** Recv buffer */
size_t recv_buf_offset; /** Recv buffer offset */
struct hub_info* hub; /** The hub instance this user belong to */ struct hub_info* hub; /** The hub instance this user belong to */
int quit_reason; /** Quit reason (see user_quit_reason) */ int quit_reason; /** Quit reason (see user_quit_reason) */
struct ip_addr_encap ipaddr; /** IP address of connected user */ struct ip_addr_encap ipaddr; /** IP address of connected user */