提交 e52a85eb authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Added initialization of threadpool, and switched parsing and processing…

FS-9775: Added initialization of threadpool, and switched parsing and processing of a received datagram to dispatch through the threadpool. Also a bit more work on searches sneaking in here.
上级 2c5e4036
......@@ -13,8 +13,8 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_search.c
libks_la_SOURCES += src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
libks_la_SOURCES += src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c
#aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
......
......@@ -162,6 +162,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
* @param raddr pointer to the remote address
* @param query string value of the query type, for example "ping"
* @param callback callback to be called when response to transaction is received
* @param transaction dereferenced out pointer to the allocated transaction, may be NULL to ignore output
* @param message dereferenced out pointer to the allocated message
* @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
......@@ -178,6 +179,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_sockaddr_t *raddr,
const char *query,
ks_dht_message_callback_t callback,
ks_dht_transaction_t **transaction,
ks_dht_message_t **message,
struct bencode **args);
......@@ -217,7 +219,8 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr);
KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message);
......@@ -235,6 +238,20 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool);
KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram);
KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram,
ks_dht_t *dht,
ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram);
/**
*
*/
......@@ -255,9 +272,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t
KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search);
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search,
const ks_dht_nodeid_t *target,
ks_dht_search_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target);
KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search);
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
......@@ -266,7 +281,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pe
KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending);
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration);
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node);
KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending);
/**
......
......@@ -9,7 +9,15 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_DEFAULT_PORT 5309
#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
#define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
//#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_PULSE_EXPIRATIONS 10
#define KS_DHT_NODEID_SIZE 20
......@@ -33,6 +41,7 @@ KS_BEGIN_EXTERN_C
#define KS_DHTRT_MAXQUERYSIZE 20
typedef struct ks_dht_s ks_dht_t;
typedef struct ks_dht_datagram_s ks_dht_datagram_t;
typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
typedef struct ks_dht_token_s ks_dht_token_t;
typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t;
......@@ -51,6 +60,15 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
struct ks_dht_datagram_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_endpoint_t *endpoint;
ks_sockaddr_t raddr;
uint8_t buffer[KS_DHT_DATAGRAM_BUFFER_SIZE];
ks_size_t buffer_length;
};
/**
* Note: This must remain a structure for casting from raw data
*/
......@@ -115,6 +133,8 @@ struct ks_dht_endpoint_s {
ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr;
ks_socket_t sock;
// @todo make sure this node is unlocked, and never gets destroyed, should also never use local nodes in search results as they can be internal
// network addresses, not what others have contacted through
ks_dht_node_t *node;
};
......@@ -122,6 +142,7 @@ struct ks_dht_transaction_s {
ks_pool_t *pool;
ks_sockaddr_t raddr;
uint32_t transactionid;
ks_dht_nodeid_t target;
ks_dht_message_callback_t callback;
ks_time_t expiration;
ks_bool_t finished;
......@@ -145,13 +166,13 @@ struct ks_dht_search_s {
ks_dht_search_callback_t *callbacks;
ks_size_t callbacks_size;
ks_hash_t *pending;
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; // @todo change this to track the nodeid only, and obtain the nodes only if/when needed
ks_size_t results_length;
};
struct ks_dht_search_pending_s {
ks_pool_t *pool;
ks_dht_node_t *node;
ks_dht_node_t *node; // @todo change this to track the nodeid only, and obtain the node only if/when needed
ks_time_t expiration;
ks_bool_t finished;
};
......@@ -175,6 +196,9 @@ struct ks_dht_s {
ks_pool_t *pool;
ks_bool_t pool_alloc;
ks_thread_pool_t *tpool;
ks_bool_t tpool_alloc;
ks_bool_t autoroute;
ks_port_t autoroute_port;
......@@ -194,7 +218,7 @@ struct ks_dht_s {
ks_q_t *send_q;
ks_dht_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE];
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
ks_size_t recv_buffer_length;
volatile uint32_t transactionid_next;
......@@ -243,12 +267,13 @@ KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht);
* Constructor function for ks_dht_t.
* Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers.
* @param dht pointer to the dht instance
* @param tpool pointer to a thread pool, may be NULL to create a new thread pool internally
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_hash_create
* @see ks_dht_register_type
* @see ks_q_create
*/
KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool);
/**
* Destructor function for ks_dht_t.
......
#include "ks_dht.h"
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool)
{
ks_dht_datagram_t *dg;
ks_assert(datagram);
ks_assert(pool);
*datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t));
dg->pool = pool;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool)
{
ks_assert(datagram);
ks_assert(pool);
memset(datagram, 0, sizeof(ks_dht_datagram_t));
datagram->pool = pool;
}
KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram)
{
ks_assert(datagram);
ks_assert(*datagram);
ks_dht_datagram_deinit(*datagram);
ks_pool_free((*datagram)->pool, *datagram);
*datagram = NULL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, ks_dht_t *dht, ks_dht_endpoint_t *endpoint, const ks_sockaddr_t *raddr)
{
ks_assert(datagram);
ks_assert(datagram->pool);
ks_assert(dht);
ks_assert(endpoint);
ks_assert(raddr);
ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
datagram->dht = dht;
datagram->endpoint = endpoint;
datagram->raddr = *raddr;
memcpy(datagram->buffer, dht->recv_buffer, dht->recv_buffer_length);
datagram->buffer_length = dht->recv_buffer_length;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram)
{
ks_assert(datagram);
datagram->buffer_length = 0;
datagram->raddr = (const ks_sockaddr_t){ 0 };
datagram->endpoint = NULL;
datagram->dht = NULL;
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
......@@ -53,7 +53,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search)
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback)
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target)
{
ks_status_t ret = KS_STATUS_SUCCESS;
......@@ -64,8 +64,6 @@ KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht
if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret;
memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE);
if (callback) ks_dht_search_callback_add(search, callback);
if ((ret = ks_hash_create(&search->pending,
KS_HASH_MODE_ARBITRARY,
KS_HASH_FLAG_RWLOCK,
......@@ -112,11 +110,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d
ks_assert(search);
if (callback) {
int32_t index = search->callbacks_size++;
int32_t index;
// @todo lock mutex
index = search->callbacks_size++;
search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
(void *)search->callbacks,
sizeof(ks_dht_search_callback_t) * search->callbacks_size);
search->callbacks[index] = callback;
// @todo unlock mutex
}
return KS_STATUS_SUCCESS;
}
......@@ -159,14 +160,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pen
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration)
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node)
{
ks_assert(pending);
ks_assert(pending->pool);
ks_assert(node);
pending->node = node;
pending->expiration = expiration;
pending->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION;
pending->finished = KS_FALSE;
return KS_STATUS_SUCCESS;
......
......@@ -56,18 +56,18 @@ int main() {
err = ks_dht_alloc(&dht1, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_init(dht1);
err = ks_dht_init(dht1, NULL);
ok(err == KS_STATUS_SUCCESS);
ks_dht_prealloc(&dht2, dht1->pool);
err = ks_dht_init(&dht2);
err = ks_dht_init(&dht2, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_alloc(&dht3, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_init(dht3);
err = ks_dht_init(dht3, NULL);
ok(err == KS_STATUS_SUCCESS);
......@@ -174,6 +174,8 @@ int main() {
ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论