Compare commits

...

2 Commits

Author SHA1 Message Date
Jan Vidar Krey 51fba8a7a1 Added support for connecting the hub to an upstream master hub (slave mode). 2013-02-07 12:17:26 +01:00
Jan Vidar Krey a7898779cb Started working on hub linking support (currently disabled). 2013-02-05 22:45:20 +01:00
19 changed files with 881 additions and 36 deletions

2
.gitignore vendored
View File

@ -14,6 +14,7 @@ uhub-admin
adcrush adcrush
uhub uhub
build-stamp build-stamp
build.ninja
debian/files debian/files
debian/uhub.debhelper.log debian/uhub.debhelper.log
debian/uhub.postinst.debhelper debian/uhub.postinst.debhelper
@ -22,3 +23,4 @@ debian/uhub.prerm.debhelper
debian/uhub.substvars debian/uhub.substvars
uhub-passwd uhub-passwd
src/version.h src/version.h

View File

@ -36,6 +36,10 @@ if (SSL_SUPPORT)
endif() endif()
endif() endif()
if (LINK_SUPPORT)
add_definitions(-DLINK_SUPPORT)
endif()
if (SYSTEMD_SUPPORT) if (SYSTEMD_SUPPORT)
INCLUDE(FindPkgConfig) INCLUDE(FindPkgConfig)
pkg_search_module(SD_DAEMON REQUIRED libsystemd-daemon) pkg_search_module(SD_DAEMON REQUIRED libsystemd-daemon)
@ -200,7 +204,7 @@ if (RELEASE)
add_definitions(-DNDEBUG) add_definitions(-DNDEBUG)
else() else()
set(CMAKE_BUILD_TYPE Debug) set(CMAKE_BUILD_TYPE Debug)
# add_definitions(-DDEBUG) add_definitions(-DDEBUG)
endif() endif()
if (UNIX) if (UNIX)

View File

@ -1,6 +1,6 @@
/* /*
* uhub - A tiny ADC p2p connection hub * uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2012, Jan Vidar Krey * Copyright (C) 2007-2013, Jan Vidar Krey
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -34,6 +34,7 @@ typedef uint32_t fourcc_t;
/* default welcome protocol support message, as sent by this server */ /* default welcome protocol support message, as sent by this server */
#define ADC_PROTO_SUPPORT "ADBASE ADTIGR ADPING ADUCMD" #define ADC_PROTO_SUPPORT "ADBASE ADTIGR ADPING ADUCMD"
#define ADC_PROTO_LINK_SUPPORT "ADTIGR ADLINK"
/* Server sent commands */ /* Server sent commands */
#define ADC_CMD_ISID FOURCC('I','S','I','D') #define ADC_CMD_ISID FOURCC('I','S','I','D')
@ -101,6 +102,12 @@ typedef uint32_t fourcc_t;
#define ADC_CMD_HCMD FOURCC('H','C','M','D') #define ADC_CMD_HCMD FOURCC('H','C','M','D')
#define ADC_CMD_ICMD FOURCC('I','C','M','D') #define ADC_CMD_ICMD FOURCC('I','C','M','D')
/* Link commands */
#define ADC_CMD_LSUP FOURCC('L','S','U','P') /* Link support handshake */
#define ADC_CMD_LINF FOURCC('L','I','N','F') /* Hub link info */
#define ADC_CMD_LGPA FOURCC('L','G','P','A') /* Hub link get password */
#define ADC_CMD_LPAS FOURCC('L','P','A','S') /* Hub link password */
#define ADC_CMD_LSTA FOURCC('L','S','T','A') /* Hub link status */
#define ADC_INF_FLAG_IPV4_ADDR "I4" /* ipv4 address */ #define ADC_INF_FLAG_IPV4_ADDR "I4" /* ipv4 address */
#define ADC_INF_FLAG_IPV6_ADDR "I6" /* ipv6 address */ #define ADC_INF_FLAG_IPV6_ADDR "I6" /* ipv6 address */

View File

@ -1,6 +1,6 @@
/* /*
* uhub - A tiny ADC p2p connection hub * uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2010, Jan Vidar Krey * Copyright (C) 2007-2013, Jan Vidar Krey
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -178,6 +178,7 @@ int adc_msg_get_arg_offset(struct adc_message* msg)
case 'I': case 'I':
case 'H': case 'H':
case 'L':
return 4; return 4;
case 'B': case 'B':
@ -383,8 +384,9 @@ struct adc_message* adc_msg_parse(const char* line, size_t length)
ok = 0; ok = 0;
break; break;
case 'I': case 'I': /* Hub to client */
case 'H': case 'H': /* Clien to hub */
case 'L': /* hub to hub Link */
ok = (length > 3); ok = (length > 3);
break; break;
@ -787,6 +789,16 @@ int adc_msg_add_argument(struct adc_message* cmd, const char* string)
return 0; return 0;
} }
int adc_msg_add_argument_string(struct adc_message* cmd, const char* string)
{
char* arg = adc_msg_escape(string);
int ret;
if (!arg) return -1;
ret = adc_msg_add_argument(cmd, arg);
hub_free(arg);
return ret;
}
char* adc_msg_get_argument(struct adc_message* cmd, int offset) char* adc_msg_get_argument(struct adc_message* cmd, int offset)
{ {
@ -868,21 +880,21 @@ int adc_msg_get_argument_index(struct adc_message* cmd, const char prefix[2])
int adc_msg_escape_length(const char* str) size_t adc_msg_escape_length(const char* str)
{ {
int add = 0; size_t add = 0;
int n = 0; size_t n = 0;
for (; str[n]; n++) for (; str[n]; n++)
if (str[n] == ' ' || str[n] == '\n' || str[n] == '\\') add++; if (str[n] == ' ' || str[n] == '\n' || str[n] == '\\') add++;
return n + add; return n + add;
} }
int adc_msg_unescape_length(const char* str) size_t adc_msg_unescape_length(const char* str)
{ {
int add = 0; size_t add = 0;
int n = 0; size_t n = 0;
int escape = 0; size_t escape = 0;
for (; str[n]; n++) for (; str[n]; n++)
{ {
if (escape) if (escape)
@ -998,3 +1010,20 @@ char* adc_msg_escape(const char* string)
return str; return str;
} }
enum msg_type adc_msg_get_type(const struct adc_message* msg)
{
switch (msg->cache[0])
{
case 'B': return msg_type_client_broadcast;
case 'C': return msg_type_client_to_client;
case 'D': return msg_type_client_direct;
case 'E': return msg_type_client_echo;
case 'F': return msg_type_client_feature;
case 'H': return msg_type_client_to_hub;
case 'I': return msg_type_hub_to_client;
case 'L': return msg_type_link_to_link;
case 'U': return msg_type_hub_to_client_udp;
}
return msg_type_unknown;
}

View File

@ -1,6 +1,6 @@
/* /*
* uhub - A tiny ADC p2p connection hub * uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2010, Jan Vidar Krey * Copyright (C) 2007-2013, Jan Vidar Krey
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -43,6 +43,20 @@ enum msg_status_level
status_level_fatal = 2, /* Fatal error (disconnect) */ status_level_fatal = 2, /* Fatal error (disconnect) */
}; };
enum msg_type
{
msg_type_unknown = 0,
msg_type_client_broadcast = 'B',
msg_type_client_to_client = 'C',
msg_type_client_direct = 'D',
msg_type_client_echo = 'E',
msg_type_client_feature = 'F',
msg_type_client_to_hub = 'H',
msg_type_hub_to_client = 'I',
msg_type_link_to_link = 'L',
msg_type_hub_to_client_udp = 'U',
};
/** /**
* Increase the reference counter for an ADC message struct. * Increase the reference counter for an ADC message struct.
* NOTE: Always use the returned value, and not the passed value, as * NOTE: Always use the returned value, and not the passed value, as
@ -171,6 +185,13 @@ extern int adc_msg_replace_named_argument(struct adc_message* cmd, const char pr
*/ */
extern int adc_msg_add_argument(struct adc_message* cmd, const char* string); extern int adc_msg_add_argument(struct adc_message* cmd, const char* string);
/**
* Append a string argumnent.
* The string will automatcally be escaped.
* @return 0 if successful, or -1 if an error occured (out of memory).
*/
extern int adc_msg_add_argument_string(struct adc_message* cmd, const char* string);
/** /**
* Append a named argument * Append a named argument
* *
@ -216,6 +237,12 @@ extern int adc_msg_unescape_to_target(const char* string, char* target, size_t t
*/ */
extern char* adc_msg_escape(const char* string); extern char* adc_msg_escape(const char* string);
/**
* Calculate the length str would be after escaping.
* Does not include any NULL terminator.
*/
size_t adc_msg_escape_length(const char* str);
/** /**
* This will ensure a newline is at the end of the command. * This will ensure a newline is at the end of the command.
*/ */
@ -234,4 +261,6 @@ void adc_msg_unterminate(struct adc_message* cmd);
*/ */
int adc_msg_get_arg_offset(struct adc_message* msg); int adc_msg_get_arg_offset(struct adc_message* msg);
enum msg_type adc_msg_get_type(const struct adc_message* msg);
#endif /* HAVE_UHUB_COMMAND_H */ #endif /* HAVE_UHUB_COMMAND_H */

View File

@ -102,7 +102,7 @@ static int config_parse_line(char* line, int line_count, void* ptr_data)
data = strip_white_space(data); data = strip_white_space(data);
data = strip_off_quotes(data); data = strip_off_quotes(data);
if (!*key || !*data) if (!*key /*|| !*data*/)
{ {
LOG_FATAL("Configuration parse error on line %d", line_count); LOG_FATAL("Configuration parse error on line %d", line_count);
return -1; return -1;

View File

@ -165,6 +165,36 @@
<since>0.3.2</since> <since>0.3.2</since>
</option> </option>
<option name="hub_link_enabled" type="boolean" default="0">
<short>Allow other hubs to link to this hub</short>
<description><![CDATA[
This allows multiple hubs to link to this hub so
that users on the different hubs appear as being on one hub.
This is useful for distributing or load balancing large hubs.
]]></description>
<since>0.5.0</since>
<ifdef>LINK_SUPPORT</ifdef>
</option>
<option name="hub_link_secret" type="string" default="">
<short>A secret token required to accept hub linking</short>
<description><![CDATA[
This should be a secret token needed to authorize hubs
linking into this one.
]]></description>
<since>0.5.0</since>
<ifdef>LINK_SUPPORT</ifdef>
</option>
<option name="hub_link_connect" type="string" default="">
<short>Connect this to hub to another hub</short>
<description><![CDATA[
The other hub must allow links to be established.
Example: uhub://host:port
]]></description>
<since>0.5.0</since>
<ifdef>LINK_SUPPORT</ifdef>
</option>
<option name="max_recv_buffer" type="int" default="4096" advanced="true" > <option name="max_recv_buffer" type="int" default="4096" advanced="true" >
<check min="1024" max="1048576" /> <check min="1024" max="1048576" />

View File

@ -17,6 +17,15 @@ void config_defaults(struct hub_config* config)
config->hub_name = hub_strdup("uhub"); config->hub_name = hub_strdup("uhub");
config->hub_description = hub_strdup("no description"); config->hub_description = hub_strdup("no description");
config->redirect_addr = hub_strdup(""); config->redirect_addr = hub_strdup("");
#ifdef LINK_SUPPORT
config->hub_link_enabled = 0;
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
config->hub_link_secret = hub_strdup("");
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
config->hub_link_connect = hub_strdup("");
#endif /* LINK_SUPPORT */
config->max_recv_buffer = 4096; config->max_recv_buffer = 4096;
config->max_send_buffer = 131072; config->max_send_buffer = 131072;
config->max_send_buffer_soft = 98304; config->max_send_buffer_soft = 98304;
@ -245,6 +254,42 @@ static int apply_config(struct hub_config* config, char* key, char* data, int li
return 0; return 0;
} }
#ifdef LINK_SUPPORT
if (!strcmp(key, "hub_link_enabled"))
{
if (!apply_boolean(key, data, &config->hub_link_enabled))
{
LOG_ERROR("Configuration parse error on line %d", line_count);
return -1;
}
return 0;
}
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
if (!strcmp(key, "hub_link_secret"))
{
if (!apply_string(key, data, &config->hub_link_secret, (char*) ""))
{
LOG_ERROR("Configuration parse error on line %d", line_count);
return -1;
}
return 0;
}
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
if (!strcmp(key, "hub_link_connect"))
{
if (!apply_string(key, data, &config->hub_link_connect, (char*) ""))
{
LOG_ERROR("Configuration parse error on line %d", line_count);
return -1;
}
return 0;
}
#endif /* LINK_SUPPORT */
if (!strcmp(key, "max_recv_buffer")) if (!strcmp(key, "max_recv_buffer"))
{ {
min = 1024; min = 1024;
@ -943,6 +988,14 @@ void free_config(struct hub_config* config)
hub_free(config->redirect_addr); hub_free(config->redirect_addr);
#ifdef LINK_SUPPORT
hub_free(config->hub_link_secret);
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
hub_free(config->hub_link_connect);
#endif /* LINK_SUPPORT */
hub_free(config->tls_require_redirect_addr); hub_free(config->tls_require_redirect_addr);
hub_free(config->tls_certificate); hub_free(config->tls_certificate);
@ -1074,6 +1127,21 @@ void dump_config(struct hub_config* config, int ignore_defaults)
if (!ignore_defaults || strcmp(config->redirect_addr, "") != 0) if (!ignore_defaults || strcmp(config->redirect_addr, "") != 0)
fprintf(stdout, "redirect_addr = \"%s\"\n", config->redirect_addr); fprintf(stdout, "redirect_addr = \"%s\"\n", config->redirect_addr);
#ifdef LINK_SUPPORT
if (!ignore_defaults || config->hub_link_enabled != 0)
fprintf(stdout, "hub_link_enabled = %s\n", config->hub_link_enabled ? "yes" : "no");
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
if (!ignore_defaults || strcmp(config->hub_link_secret, "") != 0)
fprintf(stdout, "hub_link_secret = \"%s\"\n", config->hub_link_secret);
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
if (!ignore_defaults || strcmp(config->hub_link_connect, "") != 0)
fprintf(stdout, "hub_link_connect = \"%s\"\n", config->hub_link_connect);
#endif /* LINK_SUPPORT */
if (!ignore_defaults || config->max_recv_buffer != 4096) if (!ignore_defaults || config->max_recv_buffer != 4096)
fprintf(stdout, "max_recv_buffer = %d\n", config->max_recv_buffer); fprintf(stdout, "max_recv_buffer = %d\n", config->max_recv_buffer);

View File

@ -17,6 +17,15 @@ struct hub_config
char* hub_name; /*<<< Name of hub (default: "uhub") */ char* hub_name; /*<<< Name of hub (default: "uhub") */
char* hub_description; /*<<< Short hub description, topic or subject. (default: "no description") */ char* hub_description; /*<<< Short hub description, topic or subject. (default: "no description") */
char* redirect_addr; /*<<< A common hub redirect address. (default: "") */ char* redirect_addr; /*<<< A common hub redirect address. (default: "") */
#ifdef LINK_SUPPORT
int hub_link_enabled; /*<<< Allow other hubs to link to this hub (default: 0) */
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
char* hub_link_secret; /*<<< A secret token required to accept hub linking (default: "") */
#endif /* LINK_SUPPORT */
#ifdef LINK_SUPPORT
char* hub_link_connect; /*<<< Connect this to hub to another hub (default: "") */
#endif /* LINK_SUPPORT */
int max_recv_buffer; /*<<< Max read buffer before parse, per user (default: 4096) */ int max_recv_buffer; /*<<< Max read buffer before parse, per user (default: 4096) */
int max_send_buffer; /*<<< Max send buffer before disconnect, per user (default: 131072) */ int max_send_buffer; /*<<< Max send buffer before disconnect, per user (default: 131072) */
int max_send_buffer_soft; /*<<< Max send buffer before message drops, per user (default: 98304) */ int max_send_buffer_soft; /*<<< Max send buffer before message drops, per user (default: 98304) */

View File

@ -787,6 +787,13 @@ struct hub_info* hub_start_service(struct hub_config* config)
} }
#endif #endif
#ifdef LINK_SUPPORT
if (config->hub_link_enabled)
{
LOG_INFO("Hub linking support enabled");
}
#endif
hub->config = config; hub->config = config;
hub->users = NULL; hub->users = NULL;
@ -834,6 +841,15 @@ struct hub_info* hub_start_service(struct hub_config* config)
// Start the hub command sub-system // Start the hub command sub-system
hub->commands = command_initialize(hub); hub->commands = command_initialize(hub);
#ifdef LINK_SUPPORT
if (*config->hub_link_connect)
{
link_connect_uri(hub, config->hub_link_connect);
}
#endif
return hub; return hub;
} }
@ -897,23 +913,14 @@ void hub_plugins_unload(struct hub_info* hub)
void hub_set_variables(struct hub_info* hub, struct acl_handle* acl) void hub_set_variables(struct hub_info* hub, struct acl_handle* acl)
{ {
char* tmp;
char* server = adc_msg_escape(PRODUCT_STRING); /* FIXME: OOM */
hub->acl = acl; hub->acl = acl;
hub->command_info = adc_msg_construct(ADC_CMD_IINF, 15); hub->command_info = adc_msg_construct(ADC_CMD_IINF, 15);
if (hub->command_info) if (hub->command_info)
{ {
adc_msg_add_named_argument(hub->command_info, ADC_INF_FLAG_CLIENT_TYPE, ADC_CLIENT_TYPE_HUB); adc_msg_add_named_argument(hub->command_info, ADC_INF_FLAG_CLIENT_TYPE, ADC_CLIENT_TYPE_HUB);
adc_msg_add_named_argument(hub->command_info, ADC_INF_FLAG_USER_AGENT, server); adc_msg_add_named_argument_string(hub->command_info, ADC_INF_FLAG_USER_AGENT, PRODUCT_STRING);
adc_msg_add_named_argument_string(hub->command_info, ADC_INF_FLAG_NICK, hub->config->hub_name);
tmp = adc_msg_escape(hub->config->hub_name); adc_msg_add_named_argument_string(hub->command_info, ADC_INF_FLAG_DESCRIPTION, hub->config->hub_description);
adc_msg_add_named_argument(hub->command_info, ADC_INF_FLAG_NICK, tmp);
hub_free(tmp);
tmp = adc_msg_escape(hub->config->hub_description);
adc_msg_add_named_argument(hub->command_info, ADC_INF_FLAG_DESCRIPTION, tmp);
hub_free(tmp);
} }
hub->command_support = adc_msg_construct(ADC_CMD_ISUP, 6 + strlen(ADC_PROTO_SUPPORT)); hub->command_support = adc_msg_construct(ADC_CMD_ISUP, 6 + strlen(ADC_PROTO_SUPPORT));
@ -922,16 +929,14 @@ void hub_set_variables(struct hub_info* hub, struct acl_handle* acl)
adc_msg_add_argument(hub->command_support, ADC_PROTO_SUPPORT); adc_msg_add_argument(hub->command_support, ADC_PROTO_SUPPORT);
} }
hub->command_banner = adc_msg_construct(ADC_CMD_ISTA, 100 + strlen(server)); hub->command_banner = adc_msg_construct(ADC_CMD_ISTA, 100 + adc_msg_escape_length(PRODUCT_STRING));
if (hub->command_banner) if (hub->command_banner)
{ {
if (hub->config->show_banner_sys_info)
tmp = adc_msg_escape("Powered by " PRODUCT_STRING " on " OPSYS "/" CPUINFO);
else
tmp = adc_msg_escape("Powered by " PRODUCT_STRING);
adc_msg_add_argument(hub->command_banner, "000"); adc_msg_add_argument(hub->command_banner, "000");
adc_msg_add_argument(hub->command_banner, tmp); if (hub->config->show_banner_sys_info)
hub_free(tmp); adc_msg_add_argument_string(hub->command_banner, "Powered by " PRODUCT_STRING " on " OPSYS "/" CPUINFO);
else
adc_msg_add_argument_string(hub->command_banner, "Powered by " PRODUCT_STRING);
} }
if (hub_plugins_load(hub) < 0) if (hub_plugins_load(hub) < 0)
@ -942,7 +947,6 @@ void hub_set_variables(struct hub_info* hub, struct acl_handle* acl)
else else
hub->status = (hub->config->hub_enabled ? hub_status_running : hub_status_disabled); hub->status = (hub->config->hub_enabled ? hub_status_running : hub_status_disabled);
hub_free(server);
} }

View File

@ -115,6 +115,10 @@ struct hub_info
struct command_base* commands; /* Hub command handler */ struct command_base* commands; /* Hub command handler */
struct uhub_plugins* plugins; /* Plug-ins loaded for this hub instance. */ struct uhub_plugins* plugins; /* Plug-ins loaded for this hub instance. */
#ifdef LINK_SUPPORT
struct linked_list* hub_links; /* Other hubs linked to this hub */
#endif
#ifdef SSL_SUPPORT #ifdef SSL_SUPPORT
struct ssl_context_handle* ctx; struct ssl_context_handle* ctx;
#endif /* SSL_SUPPORT */ #endif /* SSL_SUPPORT */

View File

@ -49,6 +49,34 @@ void ioq_recv_destroy(struct ioq_recv* q)
} }
} }
#define IOQ_RECV_FLAGS_PREALLOC 1
#define IOQ_RECV_FLAGS_FULL 2
enum ioq_recv_status ioq_recv_read(struct ioq_recv* q, struct net_connection* con)
{
static char buf[MAX_RECV_BUF];
size_t buf_size = ioq_recv_get(q, buf, MAX_RECV_BUF);
ssize_t size;
if (buf_size >= MAX_RECV_BUF)
return ioq_recv_full;
size = net_con_recv(con, buf + buf_size, MAX_RECV_BUF - buf_size);
if (size > 0)
buf_size += size;
if (size < 0)
return ioq_recv_error;
if (size == 0)
return ioq_recv_later;
ioq_recv_set(q, buf, buf_size);
return ioq_recv_ok;
}
size_t ioq_recv_get(struct ioq_recv* q, void* buf, size_t bufsize) size_t ioq_recv_get(struct ioq_recv* q, void* buf, size_t bufsize)
{ {
uhub_assert(bufsize >= q->size); uhub_assert(bufsize >= q->size);
@ -88,6 +116,21 @@ size_t ioq_recv_set(struct ioq_recv* q, void* buf, size_t bufsize)
} }
int ioq_recv_consume(struct ioq_recv* q, size_t bytes)
{
size_t newsize;
void* ptr;
if (!q || bytes > q->size) return 0;
newsize = (q->size - bytes);
memmove(q->buf, q->buf + bytes, newsize);
ptr = hub_realloc(q->buf, newsize);
q->buf = ptr;
q->size = newsize;
return 1;
}
struct ioq_send* ioq_send_create() struct ioq_send* ioq_send_create()
{ {
struct ioq_send* q = hub_malloc_zero(sizeof(struct ioq_send)); struct ioq_send* q = hub_malloc_zero(sizeof(struct ioq_send));

View File

@ -39,6 +39,7 @@ struct ioq_recv
{ {
char* buf; char* buf;
size_t size; size_t size;
// int flags;
}; };
/** /**
@ -102,5 +103,26 @@ extern size_t ioq_recv_set(struct ioq_recv*, void* buf, size_t bufsize);
extern int ioq_recv_is_empty(struct ioq_recv* buf); extern int ioq_recv_is_empty(struct ioq_recv* buf);
enum ioq_recv_status
{
ioq_recv_ok = 0, // read data OK
ioq_recv_later = 1, // all OK, but call again later (no change)
ioq_recv_full = 2, // all OK, but the buffer is full
ioq_recv_error = 3, // error (connection is not working)
};
/**
* Receive from connection into buffer.
*/
extern enum ioq_recv_status ioq_recv_read(struct ioq_recv* q, struct net_connection* con);
/**
* Consume 'bytes' bytes.
* 'bytes' must be <= q->size
*
* @return 1 on success, or 0 on error (only if q == NULL or bytes is > q->size).
*/
extern int ioq_recv_consume(struct ioq_recv* q, size_t bytes);
#endif /* HAVE_UHUB_IO_QUEUE_H */ #endif /* HAVE_UHUB_IO_QUEUE_H */

508
src/core/link.c Normal file
View File

@ -0,0 +1,508 @@
/*
* uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2013, Jan Vidar Krey
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include "uhub.h"
#ifdef LINK_SUPPORT
static int link_send_support(struct hub_link* link);
static void link_net_event(struct net_connection* con, int event, void *arg)
{
LOG_INFO("link_net_event(), event=%d", event);
struct hub_link* link = (struct hub_link*) arg;
struct hub_info* hub = link->hub;
int ret = 0;
if (event == NET_EVENT_TIMEOUT)
{
LOG_DEBUG("Hub link timeout!");
}
if (event & NET_EVENT_READ)
{
ret = link_handle_read(link);
if (ret < 0)
{
link_disconnect(link);
return;
}
}
if (event & NET_EVENT_WRITE)
{
ret = link_handle_write(link);
if (ret < 0)
{
link_disconnect(link);
return;
}
}
}
void link_disconnect(struct hub_link* link)
{
if (link->connection)
net_con_close(link->connection);
link->connection = NULL;
ioq_send_destroy(link->send_queue);
ioq_recv_destroy(link->recv_queue);
link->send_queue = NULL;
link->recv_queue = NULL;
// FIXME: Notify hub and disconnect users!
hub_free(link);
}
static struct hub_link* link_create_internal(struct hub_info* hub)
{
struct hub_link* link = NULL;
LOG_DEBUG("link_create_internal(), hub=%p");
link = (struct hub_link*) hub_malloc_zero(sizeof(struct hub_link));
if (link == NULL)
return NULL; /* OOM */
link->send_queue = ioq_send_create();
link->recv_queue = ioq_recv_create();
link->hub = hub;
link->state = state_protocol;
return link;
}
struct hub_link* link_create(struct hub_info* hub, struct net_connection* con, struct ip_addr_encap* addr)
{
struct hub_link* link = link_create_internal(hub);
link->connection = con;
net_con_reinitialize(link->connection, link_net_event, link, NET_EVENT_READ);
link->mode = link_mode_server;
return link;
}
static void link_connect_callback(struct net_connect_handle* handle, enum net_connect_status status, struct net_connection* con, void* ptr)
{
struct hub_link* link = (struct hub_link*) ptr;
link->connect_job = NULL;
LOG_DEBUG("link_connect_callback()");
switch (status)
{
case net_connect_status_ok:
link->connection = con;
net_con_reinitialize(link->connection, link_net_event, link, NET_EVENT_READ);
// FIXME: send handshake here
link_send_support(link);
break;
case net_connect_status_host_not_found:
case net_connect_status_no_address:
case net_connect_status_dns_error:
case net_connect_status_refused:
case net_connect_status_unreachable:
case net_connect_status_timeout:
case net_connect_status_socket_error:
// FIXME: Unable to connect - start timer and re-try connection establishment!
break;
}
}
struct link_address
{
char host[256];
uint16_t port;
};
static int link_parse_address(const char* arg, struct link_address* addr)
{
int port;
char* split;
memset(addr, 0, sizeof(struct link_address));
/* Split hostname and port (if possible) */
split = strrchr(arg, ':');
if (split == 0 || strlen(split) < 2 || strlen(split) > 6)
return 0;
/* Ensure port number is valid */
port = strtol(split+1, NULL, 10);
if (port <= 0 || port > 65535)
return 0;
memcpy(addr->host, arg, &split[0] - &arg[0]);
addr->port = port;
return 1;
}
struct hub_link* link_connect_uri(struct hub_info* hub, const char* address)
{
struct link_address link_address;
if (!link_parse_address(address, &link_address))
{
LOG_INFO("Invalid master hub link address");
return NULL;
}
return link_connect(hub, link_address.host, link_address.port);
}
struct hub_link* link_connect(struct hub_info* hub, const char* address, uint16_t port)
{
struct hub_link* link = link_create_internal(hub);
LOG_DEBUG("Connecting to master link at %s:%d...", address, port);
link->mode = link_mode_client;
link->connect_job = net_con_connect(address, port, link_connect_callback, link);
if (!link->connect_job)
{
// FIXME: Immediate failure!
LOG_DEBUG("Error connecting to master hub link.");
link_disconnect(link);
return NULL;
}
return link;
}
static int link_net_io_want_read(struct hub_link* link)
{
net_con_update(link->connection, NET_EVENT_READ);
}
static int link_net_io_want_write(struct hub_link* link)
{
net_con_update(link->connection, NET_EVENT_READ | NET_EVENT_WRITE);
}
int link_handle_write(struct hub_link* link)
{
int ret = 0;
while (ioq_send_get_bytes(link->send_queue))
{
ret = ioq_send_send(link->send_queue, link->connection);
if (ret <= 0)
break;
}
if (ret < 0)
return -1; // FIXME! Extract socket error!
if (ioq_send_get_bytes(link->send_queue))
link_net_io_want_write(link);
else
link_net_io_want_read(link);
return 0;
}
int link_send_message(struct hub_link* link, struct adc_message* msg)
{
#ifdef DEBUG_SENDQ
char* data = strndup(msg->cache, msg->length-1);
LOG_PROTO("[link] send %p: \"%s\"", link, data);
free(data);
#endif
if (!link->connection)
return -1;
uhub_assert(msg->cache && *msg->cache);
if (ioq_send_is_empty(link->send_queue) /*&& !user_flag_get(user, flag_pipeline)*/)
{
/* Perform oportunistic write */
ioq_send_add(link->send_queue, msg);
link_handle_write(link);
}
else
{
// if (check_send_queue(hub, user, msg) >= 0)
// {
ioq_send_add(link->send_queue, msg);
// if (!user_flag_get(user, flag_pipeline))
link_net_io_want_write(link);
}
return 0;
}
static int link_send_support(struct hub_link* link)
{
int ret;
struct adc_message* msg = adc_msg_construct(ADC_CMD_LSUP, 6 + strlen(ADC_PROTO_LINK_SUPPORT));
adc_msg_add_argument(msg, ADC_PROTO_LINK_SUPPORT);
ret = link_send_message(link, msg);
adc_msg_free(msg);
return ret;
}
static int link_send_welcome(struct hub_link* link)
{
int ret;
struct adc_message* info = adc_msg_construct(ADC_CMD_LINF, 128);
if (!info)
return -1;
adc_msg_add_named_argument(info, ADC_INF_FLAG_CLIENT_TYPE, ADC_CLIENT_TYPE_HUB);
adc_msg_add_named_argument_string(info, ADC_INF_FLAG_USER_AGENT, PRODUCT_STRING);
adc_msg_add_named_argument_string(info, ADC_INF_FLAG_NICK, link->hub->config->hub_name);
adc_msg_add_named_argument_string(info, ADC_INF_FLAG_DESCRIPTION, link->hub->config->hub_description);
ret = link_send_message(link, info);
link->state = state_normal;
}
static int link_send_auth_response(struct hub_link* link, const char* challenge)
{
int ret;
struct adc_message* msg = adc_msg_construct(ADC_CMD_LPAS, 128);
// FIXME: Solve challenge.
ret = link_send_message(link, msg);
adc_msg_free(msg);
return ret;
}
static int link_send_auth_request(struct hub_link* link)
{
int ret;
struct adc_message* msg = adc_msg_construct(ADC_CMD_LGPA, 128);
// FIXME: Create challenge.
char buf[64];
uint64_t tiger_res[3];
static char tiger_buf[MAX_CID_LEN+1];
LOG_DEBUG("link_send_auth_request");
// FIXME: Generate a better nonce scheme.
snprintf(buf, 64, "%p%d", link, (int) net_con_get_sd(link->connection));
tiger((uint64_t*) buf, strlen(buf), (uint64_t*) tiger_res);
base32_encode((unsigned char*) tiger_res, TIGERSIZE, tiger_buf);
tiger_buf[MAX_CID_LEN] = 0;
// Add nonce to message
adc_msg_add_argument(msg, (const char*) tiger_buf);
ret = link_send_message(link, msg);
adc_msg_free(msg);
return ret;
}
static int link_handle_support(struct hub_link* link, struct adc_message* msg)
{
int ret = 0;
LOG_DEBUG("link_handle_support");
if (link->mode == link_mode_server)
{
if (link->state == state_protocol)
{
ret = link_send_support(link);
if (ret == 0)
ret = link_send_auth_request(link);
link->state = state_verify;
}
}
return ret;
}
static int link_handle_auth_request(struct hub_link* link, struct adc_message* msg)
{
char* challenge;
int ret = -1;
LOG_DEBUG("link_handle_auth_request");
if (link->state == state_verify)
return -1;
if (link->mode == link_mode_client)
{
challenge = adc_msg_get_argument(msg, 0);
ret = link_send_auth_response(link, challenge);
hub_free(challenge);
}
return ret;
}
static int link_handle_auth_response(struct hub_link* link, struct adc_message* msg)
{
LOG_DEBUG("link_handle_auth_response. link_state=%d", (int) link->state);
if (link->state != state_verify)
return -1;
LOG_DEBUG("State is not verify!");
if (link->mode == link_mode_server)
{
// Check authentication data
// FIXME: Can involve plug-ins at this point.
return link_send_welcome(link);
}
else
{
LOG_DEBUG("Ignoring auth response - We're client mode!");
}
return -1;
}
static int link_handle_link_info(struct hub_link* link, struct adc_message* msg)
{
LOG_DEBUG("link_handle_link_info");
return 0;
}
static int link_handle_status(struct hub_link* link, struct adc_message* msg)
{
LOG_DEBUG("link_handle_status");
return -1;
}
static int link_handle_message(struct hub_link* link, const char* message, size_t length)
{
int ret = 0;
struct adc_message* cmd = 0;
LOG_INFO("link_handle_message(): %s (%d)", message, (int) length);
// FIXME: is this needed?
if (link->state == state_cleanup || link->state == state_disconnected)
return -1;
cmd = adc_msg_parse(message, length);
if (!cmd)
{
LOG_DEBUG("Unable to parse hub-link message");
return -1;
}
// if (
switch (cmd->cmd)
{
case ADC_CMD_LSUP:
ret = link_handle_support(link, cmd);
break;
case ADC_CMD_LPAS:
ret = link_handle_auth_response(link, cmd);
break;
case ADC_CMD_LGPA:
ret = link_handle_auth_request(link, cmd);
break;
case ADC_CMD_LINF:
ret = link_handle_link_info(link, cmd);
break;
case ADC_CMD_LSTA:
ret = link_handle_status(link, cmd);
break;
}
adc_msg_free(cmd);
return ret;
}
static int link_read_message(struct hub_link* link)
{
char* lastPos = 0;
char* pos = 0;
char* start = link->recv_queue->buf;
size_t remaining = link->recv_queue->size;
while ((pos = memchr(start, '\n', remaining)))
{
lastPos = pos+1;
pos[0] = '\0';
if (link->flags & 1)
{
/* FIXME Unset maxbuf flag */
link->flags = 0;
}
else
{
if (link_handle_message(link, start, (pos - start)) == -1)
{
return -1;
}
}
pos[0] = '\n'; /* FIXME: not needed */
pos ++;
remaining -= (pos - start);
start = pos;
}
ioq_recv_consume(link->recv_queue, (start - link->recv_queue->buf));
return 0;
}
int link_handle_read(struct hub_link* link)
{
int ret = 0;
while (1)
{
switch (ioq_recv_read(link->recv_queue, link->connection))
{
case ioq_recv_ok:
if (link_read_message(link) < 0)
{
// FIXME: propagate protocol error?
return -1;
}
// Parse messages then call again
break;
case ioq_recv_later:
return 0;
case ioq_recv_full:
link->flags = 1; // FIXME: MAXBUF
ioq_recv_set(link->recv_queue, 0, 0);
break;
case ioq_recv_error:
return -1; // FIXME: it would be good to signal type of socket error
}
}
return 0;
}
#endif /* LINK_SUPPORT */

70
src/core/link.h Normal file
View File

@ -0,0 +1,70 @@
/*
* uhub - A tiny ADC p2p connection hub
* Copyright (C) 2007-2013, Jan Vidar Krey
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef HAVE_UHUB_LINK_H
#define HAVE_UHUB_LINK_H
#ifdef LINK_SUPPORT
struct hub_link
{
char name[MAX_NICK_LEN+1]; /** The name of the linked hub */
char user_agent[MAX_UA_LEN+1]; /** The user agent of the linked hub */
char address[256]; /** The official address of the linked hub */
enum link_mode { link_mode_client, link_mode_server } mode;
enum user_state state;
struct ioq_send* send_queue;
struct ioq_recv* recv_queue;
struct net_connection* connection; /** Connection data */
struct net_connect_handle* connect_job; /** Only used when establishing a connection in client mode */
struct hub_info* hub;
int flags;
};
/**
* Create a link from an accepted connection (act as a link server).
*/
extern struct hub_link* link_create(struct hub_info* hub, struct net_connection* con, struct ip_addr_encap* addr);
/**
* Connect this hub to an upstream server (act as a link client).
*/
extern struct hub_link* link_connect(struct hub_info* hub, const char* address, uint16_t port);
extern struct hub_link* link_connect_uri(struct hub_info* hub, const char* address);
/**
* Disconnect a link connection.
*/
extern void link_disconnect(struct hub_link*);
/**
* Read from link connection and process messages.
* @return 0 on success, and a negative value otherwise
*/
extern int link_handle_read(struct hub_link* link);
/**
* Write queued messages to the link.
* @return 0 on success, and a negative value otherwise.
*/
extern int link_handle_write(struct hub_link* link);
#endif // LINK_SUPPORT
#endif /* HAVE_UHUB_LINK_H */

View File

@ -125,6 +125,10 @@ int main_loop()
hub_set_log_verbosity(arg_verbose); hub_set_log_verbosity(arg_verbose);
} }
#ifdef DEBUG
LOG_INFO("Debug messages enabled");
#endif
if (read_config(arg_config, &configuration, !arg_have_config) == -1) if (read_config(arg_config, &configuration, !arg_have_config) == -1)
return -1; return -1;

View File

@ -71,6 +71,17 @@ static void probe_net_event(struct net_connection* con, int events, void *arg)
probe_destroy(probe); probe_destroy(probe);
return; return;
} }
#ifdef LINK_SUPPORT
else if (probe->hub->config->hub_link_enabled && memcmp(probe_recvbuf, "LSUP", 4) == 0)
{
if (link_create(probe->hub, probe->connection, &probe->addr))
{
probe->connection = 0;
}
probe_destroy(probe);
return;
}
#endif /* LINK_SUPPORT */
#ifdef SSL_SUPPORT #ifdef SSL_SUPPORT
else if (bytes >= 11 && else if (bytes >= 11 &&
probe_recvbuf[0] == 22 && probe_recvbuf[0] == 22 &&

View File

@ -99,7 +99,7 @@ int route_to_user(struct hub_info* hub, struct hub_user* user, struct adc_messag
{ {
#ifdef DEBUG_SENDQ #ifdef DEBUG_SENDQ
char* data = strndup(msg->cache, msg->length-1); char* data = strndup(msg->cache, msg->length-1);
LOG_PROTO("send %s: \"%s\"", sid_to_string(user->id.sid), data); LOG_PROTO("[user] send %s: \"%s\"", sid_to_string(user->id.sid), data);
free(data); free(data);
#endif #endif

View File

@ -94,6 +94,7 @@ extern "C" {
#include "core/commands.h" #include "core/commands.h"
#include "core/inf.h" #include "core/inf.h"
#include "core/hubevent.h" #include "core/hubevent.h"
#include "core/link.h"
#include "core/plugincallback.h" #include "core/plugincallback.h"
#include "core/plugininvoke.h" #include "core/plugininvoke.h"
#include "core/pluginloader.h" #include "core/pluginloader.h"