Converted the DNS resolver to work with the new threading API abstraction.

This commit is contained in:
Jan Vidar Krey 2012-10-25 00:40:16 +02:00
parent 168fc5bfcc
commit 470c936e63
1 changed files with 28 additions and 47 deletions

View File

@ -19,7 +19,7 @@
#include "uhub.h" #include "uhub.h"
#define DEBUG_LOOKUP_TIME 1 // #define DEBUG_LOOKUP_TIME 1
#define MAX_CONCURRENT_JOBS 25 #define MAX_CONCURRENT_JOBS 25
static struct net_dns_job* find_and_remove_job(struct net_dns_job* job); static struct net_dns_job* find_and_remove_job(struct net_dns_job* job);
@ -38,9 +38,7 @@ struct net_dns_job
struct timeval time_finish; struct timeval time_finish;
#endif #endif
#ifdef POSIX_THREAD_SUPPORT uhub_thread_t* thread_handle;
pthread_t thread_handle;
#endif
}; };
struct net_dns_result struct net_dns_result
@ -64,9 +62,7 @@ struct net_dns_subsystem
{ {
struct linked_list* jobs; // currently running jobs struct linked_list* jobs; // currently running jobs
struct linked_list* results; // queue of results that are awaiting being delivered to callback. struct linked_list* results; // queue of results that are awaiting being delivered to callback.
#ifdef POSIX_THREAD_SUPPORT uhub_mutex_t mutex;
pthread_mutex_t mutex;
#endif // POSIX_THREAD_SUPPORT
}; };
static struct net_dns_subsystem* g_dns = NULL; static struct net_dns_subsystem* g_dns = NULL;
@ -77,43 +73,36 @@ void net_dns_initialize()
g_dns = (struct net_dns_subsystem*) hub_malloc_zero(sizeof(struct net_dns_subsystem)); g_dns = (struct net_dns_subsystem*) hub_malloc_zero(sizeof(struct net_dns_subsystem));
g_dns->jobs = list_create(); g_dns->jobs = list_create();
g_dns->results = list_create(); g_dns->results = list_create();
#ifdef POSIX_THREAD_SUPPORT uhub_mutex_init(&g_dns->mutex);
pthread_mutex_init(&g_dns->mutex, NULL);
#endif
} }
void net_dns_destroy() void net_dns_destroy()
{ {
#ifdef POSIX_THREAD_SUPPORT
struct net_dns_job* job; struct net_dns_job* job;
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
LOG_TRACE("net_dns_destroy(): jobs=%d", (int) list_size(g_dns->jobs)); LOG_TRACE("net_dns_destroy(): jobs=%d", (int) list_size(g_dns->jobs));
job = (struct net_dns_job*) list_get_first(g_dns->jobs); job = (struct net_dns_job*) list_get_first(g_dns->jobs);
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
while (job) while (job)
{ {
net_dns_job_cancel(job); net_dns_job_cancel(job);
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
job = (struct net_dns_job*) list_get_first(g_dns->jobs); job = (struct net_dns_job*) list_get_first(g_dns->jobs);
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
} }
#endif
} }
#ifdef POSIX_THREAD_SUPPORT
static void dummy_free(void* ptr) static void dummy_free(void* ptr)
{ {
} }
#endif
void net_dns_process() void net_dns_process()
{ {
#ifdef POSIX_THREAD_SUPPORT
struct net_dns_result* result; struct net_dns_result* result;
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
LOG_DUMP("net_dns_process(): jobs=%d, results=%d", (int) list_size(g_dns->jobs), (int) list_size(g_dns->results)); LOG_DUMP("net_dns_process(): jobs=%d, results=%d", (int) list_size(g_dns->jobs), (int) list_size(g_dns->results));
for (result = (struct net_dns_result*) list_get_first(g_dns->results); result; result = (struct net_dns_result*) list_get_next(g_dns->results)) for (result = (struct net_dns_result*) list_get_first(g_dns->results); result; result = (struct net_dns_result*) list_get_next(g_dns->results))
@ -126,7 +115,7 @@ void net_dns_process()
#endif #endif
// wait for the work thread to finish // wait for the work thread to finish
pthread_join(job->thread_handle, NULL); uhub_thread_join(job->thread_handle);
// callback - should we delete the data immediately? // callback - should we delete the data immediately?
if (job->callback(job, result)) if (job->callback(job, result))
@ -145,11 +134,9 @@ void net_dns_process()
} }
list_clear(g_dns->results, &dummy_free); list_clear(g_dns->results, &dummy_free);
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
#endif // POSIX_THREAD_SUPPORT
} }
#ifdef POSIX_THREAD_SUPPORT
static void* job_thread_resolve_name(void* ptr) static void* job_thread_resolve_name(void* ptr)
{ {
struct net_dns_job* job = (struct net_dns_job*) ptr; struct net_dns_job* job = (struct net_dns_job*) ptr;
@ -203,14 +190,14 @@ static void* job_thread_resolve_name(void* ptr)
gettimeofday(&job->time_finish, NULL); gettimeofday(&job->time_finish, NULL);
#endif #endif
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
list_remove(g_dns->jobs, job); list_remove(g_dns->jobs, job);
list_append(g_dns->results, dns_results); list_append(g_dns->results, dns_results);
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
return dns_results; return dns_results;
} }
#endif
extern struct net_dns_job* net_dns_gethostbyname(const char* host, int af, net_dns_job_cb callback, void* ptr) extern struct net_dns_job* net_dns_gethostbyname(const char* host, int af, net_dns_job_cb callback, void* ptr)
{ {
@ -225,12 +212,11 @@ extern struct net_dns_job* net_dns_gethostbyname(const char* host, int af, net_d
#endif #endif
// FIXME - scheduling - what about a max number of threads? // FIXME - scheduling - what about a max number of threads?
#ifdef POSIX_THREAD_SUPPORT uhub_mutex_lock(&g_dns->mutex);
pthread_mutex_lock(&g_dns->mutex); job->thread_handle = uhub_thread_create(job_thread_resolve_name, job);
int err = pthread_create(&job->thread_handle, NULL, job_thread_resolve_name, job); if (!job->thread_handle)
if (err)
{ {
LOG_WARN("Unable to create thread: (%d) %s", err, strerror(err)); LOG_WARN("Unable to create thread");
free_job(job); free_job(job);
job = NULL; job = NULL;
} }
@ -238,8 +224,7 @@ extern struct net_dns_job* net_dns_gethostbyname(const char* host, int af, net_d
{ {
list_append(g_dns->jobs, job); list_append(g_dns->jobs, job);
} }
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
#endif
return job; return job;
} }
@ -253,13 +238,12 @@ extern struct net_dns_job* net_dns_gethostbyaddr(struct ip_addr_encap* ipaddr, n
job->callback = callback; job->callback = callback;
job->ptr = ptr; job->ptr = ptr;
#ifdef POSIX_THREAD_SUPPORT
// if (pthread_create(&job->thread_handle, NULL, start_job, job)) // if (pthread_create(&job->thread_handle, NULL, start_job, job))
// { // {
// free_job(job); // free_job(job);
// return NULL; // return NULL;
// } // }
#endif
return job; return job;
} }
@ -296,7 +280,6 @@ static struct net_dns_result* find_and_remove_result(struct net_dns_job* job)
extern int net_dns_job_cancel(struct net_dns_job* job) extern int net_dns_job_cancel(struct net_dns_job* job)
{ {
#ifdef POSIX_THREAD_SUPPORT
int retval = 0; int retval = 0;
struct net_dns_result* res; struct net_dns_result* res;
@ -310,24 +293,23 @@ extern int net_dns_job_cancel(struct net_dns_job* job)
* If the job is already finished, but the result has not been delivered, then this * If the job is already finished, but the result has not been delivered, then this
* deletes the result and the job. * deletes the result and the job.
*/ */
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
if (find_and_remove_job(job)) if (find_and_remove_job(job))
{ {
// job still active - cancel it, then close it. // job still active - cancel it, then close it.
pthread_cancel(job->thread_handle); uhub_thread_cancel(job->thread_handle);
pthread_join(job->thread_handle, NULL); uhub_thread_join(job->thread_handle);
free_job(job); free_job(job);
retval = 1; retval = 1;
} }
else if ((res = find_and_remove_result(job))) else if ((res = find_and_remove_result(job)))
{ {
// job already finished - close it. // job already finished - close it.
pthread_join(job->thread_handle, NULL); uhub_thread_join(job->thread_handle);
net_dns_result_free(res); net_dns_result_free(res);
} }
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
return retval; return retval;
#endif
} }
extern struct net_dns_result* net_dns_job_sync_wait(struct net_dns_job* job) extern struct net_dns_result* net_dns_job_sync_wait(struct net_dns_job* job)
@ -337,16 +319,15 @@ extern struct net_dns_result* net_dns_job_sync_wait(struct net_dns_job* job)
// Wait for job to finish (if not already) // Wait for job to finish (if not already)
// This should make sure the job is removed from jobs and a result is // This should make sure the job is removed from jobs and a result is
// present in results. // present in results.
pthread_join(job->thread_handle, NULL); uhub_thread_join(job->thread_handle);
// Remove the result in order to prevent the callback from being called. // Remove the result in order to prevent the callback from being called.
pthread_mutex_lock(&g_dns->mutex); uhub_mutex_lock(&g_dns->mutex);
res = find_and_remove_result(job); res = find_and_remove_result(job);
uhub_assert(res != NULL); uhub_assert(res != NULL);
res->job = NULL; res->job = NULL;
free_job(job); free_job(job);
pthread_mutex_unlock(&g_dns->mutex); uhub_mutex_unlock(&g_dns->mutex);
return res; return res;
} }