提交 4517a511 authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Major cleanup around allocations, more work on search functionality,…

FS-9775: Major cleanup around allocations, more work on search functionality, adjusted polling to reduce timeout when outgoing messages are pending
上级 d7222718
...@@ -220,7 +220,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ...@@ -220,7 +220,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data); KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message);
...@@ -242,80 +241,60 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag ...@@ -242,80 +241,60 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool); ks_pool_t *pool,
KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram);
KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram,
ks_dht_t *dht, ks_dht_t *dht,
ks_dht_endpoint_t *endpoint, ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr); const ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram); KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram);
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_endpoint_alloc(ks_dht_endpoint_t **endpoint, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
KS_DECLARE(ks_status_t) ks_dht_endpoint_prealloc(ks_dht_endpoint_t *endpoint, ks_pool_t *pool); ks_pool_t *pool,
KS_DECLARE(ks_status_t) ks_dht_endpoint_free(ks_dht_endpoint_t **endpoint);
KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint,
const ks_dht_nodeid_t *nodeid, const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *addr, const ks_sockaddr_t *addr,
ks_socket_t sock); ks_socket_t sock);
KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint); KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint);
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target);
KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool); KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search);
KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search);
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target);
KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search);
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool); KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending);
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node);
KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending);
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_storageitem_alloc(ks_dht_storageitem_t **item, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_prealloc(ks_dht_storageitem_t *item, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
KS_DECLARE(ks_status_t) ks_dht_storageitem_free(ks_dht_storageitem_t **item); ks_pool_t *pool,
struct bencode *v,
KS_DECLARE(ks_status_t) ks_dht_storageitem_init(ks_dht_storageitem_t *item, struct bencode *v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_deinit(ks_dht_storageitem_t *item);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create(ks_dht_storageitem_t *item, ks_bool_t mutable);
KS_DECLARE(ks_status_t) ks_dht_storageitem_immutable(ks_dht_storageitem_t *item);
KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item,
ks_dht_storageitem_key_t *k, ks_dht_storageitem_key_t *k,
uint8_t *salt, uint8_t *salt,
ks_size_t salt_length, ks_size_t salt_length,
int64_t sequence, int64_t sequence,
ks_dht_storageitem_signature_t *signature); ks_dht_storageitem_signature_t *signature);
KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
/** /**
* *
*/ */
//KS_DECLARE(ks_status_t) ks_dht_node_alloc(ks_dht_node_t **node, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
//KS_DECLARE(ks_status_t) ks_dht_node_prealloc(ks_dht_node_t *node, ks_pool_t *pool); ks_pool_t *pool,
//KS_DECLARE(ks_status_t) ks_dht_node_free(ks_dht_node_t *node); ks_sockaddr_t *raddr,
uint32_t transactionid,
//KS_DECLARE(ks_status_t) ks_dht_node_init(ks_dht_node_t *node, const ks_dht_nodeid_t *id, const ks_sockaddr_t *addr); ks_dht_message_callback_t callback);
//KS_DECLARE(ks_status_t) ks_dht_node_deinit(ks_dht_node_t *node); KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction);
//KS_DECLARE(ks_status_t) ks_dht_node_address_check(ks_dht_node_t *node, const ks_sockaddr_t *addr);
//KS_DECLARE(ks_bool_t) ks_dht_node_address_exists(ks_dht_node_t *node, const ks_sockaddr_t *addr);
//KS_DECLARE(ks_status_t) ks_dht_node_address_add(ks_dht_node_t *node, const ks_sockaddr_t *addr);
KS_END_EXTERN_C KS_END_EXTERN_C
......
...@@ -124,6 +124,7 @@ struct ks_dht_message_s { ...@@ -124,6 +124,7 @@ struct ks_dht_message_s {
struct bencode *data; struct bencode *data;
uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
ks_size_t transactionid_length; ks_size_t transactionid_length;
ks_dht_transaction_t *transaction;
char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
struct bencode *args; struct bencode *args;
}; };
...@@ -166,13 +167,14 @@ struct ks_dht_search_s { ...@@ -166,13 +167,14 @@ struct ks_dht_search_s {
ks_dht_search_callback_t *callbacks; ks_dht_search_callback_t *callbacks;
ks_size_t callbacks_size; ks_size_t callbacks_size;
ks_hash_t *pending; ks_hash_t *pending;
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; // @todo change this to track the nodeid only, and obtain the nodes only if/when needed ks_dht_nodeid_t results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_size_t results_length; ks_size_t results_length;
}; };
struct ks_dht_search_pending_s { struct ks_dht_search_pending_s {
ks_pool_t *pool; ks_pool_t *pool;
ks_dht_node_t *node; // @todo change this to track the nodeid only, and obtain the node only if/when needed ks_dht_nodeid_t nodeid;
ks_time_t expiration; ks_time_t expiration;
ks_bool_t finished; ks_bool_t finished;
}; };
...@@ -221,6 +223,7 @@ struct ks_dht_s { ...@@ -221,6 +223,7 @@ struct ks_dht_s {
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
ks_size_t recv_buffer_length; ks_size_t recv_buffer_length;
ks_mutex_t *tid_mutex;
volatile uint32_t transactionid_next; volatile uint32_t transactionid_next;
ks_hash_t *transactions_hash; ks_hash_t *transactions_hash;
...@@ -235,62 +238,22 @@ struct ks_dht_s { ...@@ -235,62 +238,22 @@ struct ks_dht_s {
ks_hash_t *storage_hash; ks_hash_t *storage_hash;
}; };
/**
* Allocator function for ks_dht_t.
* Should be used when a ks_dht_t is allocated on the heap, and may provide an external memory pool or allocate one internally.
* @param dht dereferenced out pointer to the allocated dht instance
* @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new pool internally
* @param The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
*/
KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool);
/**
* Preallocator function for ks_dht_t.
* Should be used when a ks_dht_t is preallocated on the stack or within another structure, and must provide an external memory pool.
* @param dht pointer to the dht instance
* @param pool pointer to the memory pool used by the dht instance
*/
KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool);
/**
* Deallocator function for ks_dht_t.
* Must be used when a ks_dht_t is allocated using ks_dht_alloc, will also destroy memory pool if it was created internally.
* @param dht dereferenced in/out pointer to the dht instance, NULL upon return
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_dht_deinit
* @see ks_pool_free
* @see ks_pool_close
*/
KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht);
/** /**
* Constructor function for ks_dht_t. * Constructor function for ks_dht_t.
* Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers. * Will allocate and initialize internal state including registration of message handlers.
* @param dht pointer to the dht instance * @param dht dereferenced out pointer to the allocated dht instance
* @param tpool pointer to a thread pool, may be NULL to create a new thread pool internally * @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new memory pool internally
* @return The ks_status_t result: KS_STATUS_SUCCESS, ... * @param tpool pointer to a thread pool used by the dht instance, may be NULL to create a new thread pool internally
* @see ks_hash_create * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
* @see ks_dht_register_type
* @see ks_q_create
*/ */
KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool); KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool);
/** /**
* Destructor function for ks_dht_t. * Destructor function for ks_dht_t.
* Must be used regardless of how ks_dht_t is allocated, will deallocate and deinitialize internal state. * Will deinitialize and deallocate internal state.
* @param dht pointer to the dht instance * @param dht dereferenced in/out pointer to the dht instance, NULL upon return
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_dht_storageitem_deinit
* @see ks_dht_storageitem_free
* @see ks_hash_destroy
* @see ks_dht_message_deinit
* @see ks_dht_message_free
* @see ks_q_destroy
* @see ks_dht_endpoint_deinit
* @see ks_dht_endpoint_free
* @see ks_pool_free
*/ */
KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht); KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht);
/** /**
* Enable or disable (default) autorouting support. * Enable or disable (default) autorouting support.
...@@ -361,27 +324,15 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); ...@@ -361,27 +324,15 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
/** ks_dht_endpoint_t *endpoint,
* ks_sockaddr_t *raddr,
*/ ks_bool_t alloc_data);
KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data);
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message); KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
/** /**
* *
...@@ -412,32 +363,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, ...@@ -412,32 +363,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
ks_size_t transactionid_length, ks_size_t transactionid_length,
struct bencode **args); struct bencode **args);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction);
KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction,
ks_sockaddr_t *raddr,
uint32_t transactionid,
ks_dht_message_callback_t callback);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction);
/** /**
* route table methods * route table methods
......
...@@ -2,74 +2,57 @@ ...@@ -2,74 +2,57 @@
#include "ks_dht-int.h" #include "ks_dht-int.h"
#include "sodium.h" #include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
ks_pool_t *pool,
ks_dht_t *dht,
ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr)
{ {
ks_dht_datagram_t *dg; ks_dht_datagram_t *dg;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(datagram); ks_assert(datagram);
ks_assert(pool); ks_assert(pool);
ks_assert(dht);
ks_assert(endpoint);
ks_assert(raddr);
ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
*datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t));
if (!dg) {
ret = KS_STATUS_NO_MEM;
goto done;
}
dg->pool = pool; dg->pool = pool;
return KS_STATUS_SUCCESS; dg->dht = dht;
} dg->endpoint = endpoint;
dg->raddr = *raddr;
KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool)
{
ks_assert(datagram);
ks_assert(pool);
memset(datagram, 0, sizeof(ks_dht_datagram_t)); memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length);
dg->buffer_length = dht->recv_buffer_length;
datagram->pool = pool;
}
KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram)
{
ks_assert(datagram);
ks_assert(*datagram);
ks_dht_datagram_deinit(*datagram);
ks_pool_free((*datagram)->pool, *datagram);
done:
if (ret != KS_STATUS_SUCCESS) {
if (dg) ks_dht_datagram_destroy(&dg);
*datagram = NULL; *datagram = NULL;
}
return KS_STATUS_SUCCESS; return ret;
} }
KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram)
KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, ks_dht_t *dht, ks_dht_endpoint_t *endpoint, const ks_sockaddr_t *raddr)
{ {
ks_assert(datagram); ks_dht_datagram_t *dg;
ks_assert(datagram->pool);
ks_assert(dht);
ks_assert(endpoint);
ks_assert(raddr);
ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
datagram->dht = dht;
datagram->endpoint = endpoint;
datagram->raddr = *raddr;
memcpy(datagram->buffer, dht->recv_buffer, dht->recv_buffer_length);
datagram->buffer_length = dht->recv_buffer_length;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram)
{
ks_assert(datagram); ks_assert(datagram);
ks_assert(*datagram);
datagram->buffer_length = 0; dg = *datagram;
datagram->raddr = (const ks_sockaddr_t){ 0 };
datagram->endpoint = NULL;
datagram->dht = NULL;
return KS_STATUS_SUCCESS; ks_pool_free(dg->pool, dg);
}
*datagram = NULL;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
......
...@@ -5,87 +5,60 @@ ...@@ -5,87 +5,60 @@
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_endpoint_alloc(ks_dht_endpoint_t **endpoint, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_pool_t *pool,
const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *addr,
ks_socket_t sock)
{ {
ks_dht_endpoint_t *ep; ks_dht_endpoint_t *ep;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(endpoint); ks_assert(endpoint);
ks_assert(pool); ks_assert(pool);
ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
*endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t)); *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t));
if (!ep) {
ret = KS_STATUS_NO_MEM;
goto done;
}
ep->pool = pool; ep->pool = pool;
ep->sock = KS_SOCK_INVALID; if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE);
else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
return KS_STATUS_SUCCESS; ep->addr = *addr;
ep->sock = sock;
done:
if (ret != KS_STATUS_SUCCESS) {
if (ep) ks_dht_endpoint_destroy(&ep);
*endpoint = NULL;
}
return ret;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_endpoint_prealloc(ks_dht_endpoint_t *endpoint, ks_pool_t *pool) KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint)
{ {
ks_assert(endpoint); ks_dht_endpoint_t *ep;
ks_assert(pool);
memset(endpoint, 0, sizeof(ks_dht_endpoint_t));
endpoint->pool = pool;
endpoint->sock = KS_SOCK_INVALID;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_endpoint_free(ks_dht_endpoint_t **endpoint)
{
ks_assert(endpoint); ks_assert(endpoint);
ks_assert(*endpoint); ks_assert(*endpoint);
ks_dht_endpoint_deinit(*endpoint); ep = *endpoint;
ks_pool_free((*endpoint)->pool, *endpoint);
*endpoint = NULL;
return KS_STATUS_SUCCESS;
}
/** if (ep->node) {
* // @todo release the node?
*/ }
KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_socket_t sock) if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
{ ks_pool_free(ep->pool, ep);
ks_assert(endpoint);
ks_assert(endpoint->pool);
ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
if (!nodeid) randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE);
else memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
endpoint->addr = *addr;
endpoint->sock = sock;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint)
{
ks_assert(endpoint);
endpoint->node = NULL;
if (endpoint->sock != KS_SOCK_INVALID) ks_socket_close(&endpoint->sock);
endpoint->addr = (const ks_sockaddr_t){ 0 };
return KS_STATUS_SUCCESS; *endpoint = NULL;
} }
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
* mode:c * mode:c
......
...@@ -4,86 +4,59 @@ ...@@ -4,86 +4,59 @@
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
ks_dht_endpoint_t *endpoint,
ks_sockaddr_t *raddr,
ks_bool_t alloc_data)
{ {
ks_dht_message_t *msg; ks_dht_message_t *m;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(message); ks_assert(message);
ks_assert(pool); ks_assert(pool);
*message = msg = ks_pool_alloc(pool, sizeof(ks_dht_message_t)); *message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t));
msg->pool = pool; if (!m) {
ret = KS_STATUS_NO_MEM;
return KS_STATUS_SUCCESS; goto done;
} }
m->pool = pool;
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool)
{
ks_assert(message);
ks_assert(pool);
memset(message, 0, sizeof(ks_dht_message_t));
message->pool = pool;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message)
{
ks_assert(message);
ks_assert(*message);
ks_dht_message_deinit(*message); m->endpoint = endpoint;
ks_pool_free((*message)->pool, *message); m->raddr = *raddr;
if (alloc_data) m->data = ben_dict();
done:
if (ret != KS_STATUS_SUCCESS) {
if (m) ks_dht_message_destroy(&m);
*message = NULL; *message = NULL;
}
return KS_STATUS_SUCCESS; return ret;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data) KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
{ {
ks_assert(message); ks_dht_message_t *m;
ks_assert(message->pool);
message->endpoint = ep;
message->raddr = *raddr;
if (alloc_data) message->data = ben_dict();
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message)
{
ks_assert(message); ks_assert(message);
ks_assert(*message);
m = *message;
message->endpoint = NULL; if (m->data) {
message->raddr = (const ks_sockaddr_t){ 0 }; ben_free(m->data);
message->args = NULL; m->data = NULL;
message->type[0] = '\0';
message->transactionid_length = 0;
if (message->data) {
ben_free(message->data);
message->data = NULL;
} }
ks_pool_free(m->pool, *message);
return KS_STATUS_SUCCESS; *message = NULL;
} }
/** /**
* *
*/ */
...@@ -104,7 +77,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui ...@@ -104,7 +77,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
message->data = ben_decode((const void *)buffer, buffer_length); message->data = ben_decode((const void *)buffer, buffer_length);
if (!message->data) { if (!message->data) {
ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n"); ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
goto failure; return KS_STATUS_FAIL;
} }
ks_log(KS_LOG_DEBUG, "Message decoded\n"); ks_log(KS_LOG_DEBUG, "Message decoded\n");
...@@ -113,14 +86,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui ...@@ -113,14 +86,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
t = ben_dict_get_by_str(message->data, "t"); t = ben_dict_get_by_str(message->data, "t");
if (!t) { if (!t) {
ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n"); ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n");
goto failure; return KS_STATUS_FAIL;
} }
tv = ben_str_val(t); tv = ben_str_val(t);
tv_len = ben_str_len(t); tv_len = ben_str_len(t);
if (tv_len > KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE) { if (tv_len > KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len); ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len);
goto failure; return KS_STATUS_FAIL;
} }
memcpy(message->transactionid, tv, tv_len); memcpy(message->transactionid, tv, tv_len);
...@@ -131,14 +104,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui ...@@ -131,14 +104,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
y = ben_dict_get_by_str(message->data, "y"); y = ben_dict_get_by_str(message->data, "y");
if (!y) { if (!y) {
ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n"); ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n");
goto failure; return KS_STATUS_FAIL;
} }
yv = ben_str_val(y); yv = ben_str_val(y);
yv_len = ben_str_len(y); yv_len = ben_str_len(y);
if (yv_len >= KS_DHT_MESSAGE_TYPE_MAX_SIZE) { if (yv_len >= KS_DHT_MESSAGE_TYPE_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len); ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len);
goto failure; return KS_STATUS_FAIL;
} }
memcpy(message->type, yv, yv_len); memcpy(message->type, yv, yv_len);
...@@ -146,10 +119,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui ...@@ -146,10 +119,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type); ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
failure:
ks_dht_message_deinit(message);
return KS_STATUS_FAIL;
} }
/** /**
......
...@@ -5,104 +5,70 @@ ...@@ -5,104 +5,70 @@
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target)
{ {
ks_dht_search_t *s; ks_dht_search_t *s;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(search); ks_assert(search);
ks_assert(pool); ks_assert(pool);
ks_assert(target);
*search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
if (!s) {
ret = KS_STATUS_NO_MEM;
goto done;
}
s->pool = pool; s->pool = pool;
return KS_STATUS_SUCCESS; if ((ret = ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool)) != KS_STATUS_SUCCESS) goto done;
} memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE);
/**
*
*/
KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool)
{
ks_assert(search);
ks_assert(pool);
memset(search, 0, sizeof(ks_dht_search_t));
search->pool = pool;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(search);
ks_assert(*search);
if ((ret = ks_dht_search_deinit(*search)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_pool_free((*search)->pool, *search)) != KS_STATUS_SUCCESS) return ret;
*search = NULL;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(search);
ks_assert(search->pool);
ks_assert(target);
if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret;
memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE);
if ((ret = ks_hash_create(&search->pending, if ((ret = ks_hash_create(&s->pending,
KS_HASH_MODE_ARBITRARY, KS_HASH_MODE_ARBITRARY,
KS_HASH_FLAG_RWLOCK, KS_HASH_FLAG_RWLOCK,
search->pool)) != KS_STATUS_SUCCESS) return ret; s->pool)) != KS_STATUS_SUCCESS) goto done;
ks_hash_set_keysize(search->pending, KS_DHT_NODEID_SIZE); ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE);
done:
if (ret != KS_STATUS_SUCCESS) {
if (s) ks_dht_search_destroy(&s);
*search = NULL;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search) KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
{ {
ks_dht_search_t *s;
ks_hash_iterator_t *it; ks_hash_iterator_t *it;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(search); ks_assert(search);
ks_assert(*search);
s = *search;
search->results_length = 0; if (s->pending) {
if (search->pending) { for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
for (it = ks_hash_first(search->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key;
ks_dht_search_pending_t *val; ks_dht_search_pending_t *val;
ks_hash_this(it, &key, NULL, (void **)&val); ks_hash_this_val(it, (void **)&val);
if ((ret = ks_dht_search_pending_deinit(val)) != KS_STATUS_SUCCESS) return ret; ks_dht_search_pending_destroy(&val);
if ((ret = ks_dht_search_pending_free(&val)) != KS_STATUS_SUCCESS) return ret;
} }
ks_hash_destroy(&search->pending); ks_hash_destroy(&s->pending);
} }
search->callbacks_size = 0; if (s->callbacks) {
if (search->callbacks) { ks_pool_free(s->pool, s->callbacks);
if ((ret = ks_pool_free(search->pool, search->callbacks)) != KS_STATUS_SUCCESS) return ret; s->callbacks = NULL;
search->callbacks = NULL;
} }
if (search->mutex && (ret = ks_mutex_destroy(&search->mutex)) != KS_STATUS_SUCCESS) return ret; if (s->mutex) ks_mutex_destroy(&s->mutex);
return KS_STATUS_SUCCESS; ks_pool_free(s->pool, s);
*search = NULL;
} }
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback) KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
...@@ -111,77 +77,58 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d ...@@ -111,77 +77,58 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d
if (callback) { if (callback) {
int32_t index; int32_t index;
// @todo lock mutex
ks_mutex_lock(search->mutex);
index = search->callbacks_size++; index = search->callbacks_size++;
search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
(void *)search->callbacks, (void *)search->callbacks,
sizeof(ks_dht_search_callback_t) * search->callbacks_size); sizeof(ks_dht_search_callback_t) * search->callbacks_size);
if (!search->callbacks) return KS_STATUS_NO_MEM;
search->callbacks[index] = callback; search->callbacks[index] = callback;
// @todo unlock mutex ks_mutex_unlock(search->mutex);
} }
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid)
{ {
ks_dht_search_pending_t *p; ks_dht_search_pending_t *p;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(pending); ks_assert(pending);
ks_assert(pool); ks_assert(pool);
*pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
if (!p) {
ret = KS_STATUS_NO_MEM;
goto done;
}
p->pool = pool; p->pool = pool;
return KS_STATUS_SUCCESS; p->nodeid = *nodeid;
} p->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION;
p->finished = KS_FALSE;
KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool)
{
ks_assert(pending);
ks_assert(pool);
memset(pending, 0, sizeof(ks_dht_search_pending_t));
pending->pool = pool;
}
KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(pending);
ks_assert(*pending);
if ((ret = ks_dht_search_pending_deinit(*pending)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_pool_free((*pending)->pool, *pending)) != KS_STATUS_SUCCESS) return ret;
done:
if (ret != KS_STATUS_SUCCESS) {
if (p) ks_dht_search_pending_destroy(&p);
*pending = NULL; *pending = NULL;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node) KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending)
{ {
ks_assert(pending); ks_dht_search_pending_t *p;
ks_assert(pending->pool);
ks_assert(node);
pending->node = node;
pending->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION;
pending->finished = KS_FALSE;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending)
{
ks_assert(pending); ks_assert(pending);
ks_assert(*pending);
pending->node = NULL; p = *pending;
pending->expiration = 0;
pending->finished = KS_FALSE;
return KS_STATUS_SUCCESS; ks_pool_free(p->pool, p);
*pending = NULL;
} }
/* For Emacs: /* For Emacs:
......
...@@ -5,190 +5,121 @@ ...@@ -5,190 +5,121 @@
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_storageitem_alloc(ks_dht_storageitem_t **item, ks_pool_t *pool) KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v)
{ {
ks_dht_storageitem_t *si; ks_dht_storageitem_t *si;
SHA_CTX sha;
size_t enc_len = 0;
uint8_t *enc = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(item); ks_assert(item);
ks_assert(pool); ks_assert(pool);
ks_assert(v);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
*item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
if (!si) {
ret = KS_STATUS_NO_MEM;
goto done;
}
si->pool = pool; si->pool = pool;
return KS_STATUS_SUCCESS; si->mutable = KS_FALSE;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_prealloc(ks_dht_storageitem_t *item, ks_pool_t *pool)
{
ks_assert(item);
ks_assert(pool);
memset(item, 0, sizeof(ks_dht_storageitem_t));
item->pool = pool; si->v = ben_clone(v);
if (!si->v) {
return KS_STATUS_SUCCESS; ret = KS_STATUS_NO_MEM;
} goto done;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_free(ks_dht_storageitem_t **item)
{
ks_assert(item);
ks_assert(*item);
ks_dht_storageitem_deinit(*item); enc = ben_encode(&enc_len, si->v);
ks_pool_free((*item)->pool, *item); SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len);
SHA1_Final(si->id.id, &sha);
free(enc);
done:
if (ret != KS_STATUS_SUCCESS) {
if (si) ks_dht_storageitem_destroy(&si);
*item = NULL; *item = NULL;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_init(ks_dht_storageitem_t *item, struct bencode *v)
{
ks_assert(item);
ks_assert(item->pool);
ks_assert(v);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
item->v = ben_clone(v);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_deinit(ks_dht_storageitem_t *item)
{
ks_assert(item);
if (item->v) {
ben_free(item->v);
item->v = NULL;
} }
return ret;
return KS_STATUS_SUCCESS;
} }
/** KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
* ks_pool_t *pool,
*/ struct bencode *v,
KS_DECLARE(ks_status_t) ks_dht_storageitem_create(ks_dht_storageitem_t *item, ks_bool_t mutable) ks_dht_storageitem_key_t *k,
uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
ks_dht_storageitem_signature_t *signature)
{ {
ks_dht_storageitem_t *si;
SHA_CTX sha; SHA_CTX sha;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(item); ks_assert(item);
ks_assert(item->pool); ks_assert(pool);
ks_assert(item->v); ks_assert(v);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
item->mutable = mutable; ks_assert(k);
ks_assert(!(!salt && salt_length > 0));
if (!mutable) { ks_assert(!(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
size_t enc_len = 0; ks_assert(signature);
uint8_t *enc = ben_encode(&enc_len, item->v);
SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len);
SHA1_Final(item->id.id, &sha);
free(enc);
} else {
size_t enc_len = 0;
uint8_t *enc = NULL;
struct bencode *sig = ben_dict();
crypto_sign_keypair(item->pk.key, item->sk.key);
randombytes_buf(item->salt, KS_DHT_STORAGEITEM_SALT_MAX_SIZE);
item->salt_length = KS_DHT_STORAGEITEM_SALT_MAX_SIZE;
item->seq = 1;
ben_dict_set(sig, ben_blob("salt", 4), ben_blob(item->salt, item->salt_length));
ben_dict_set(sig, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(sig, ben_blob("v", 1), ben_clone(item->v));
enc = ben_encode(&enc_len, sig);
ben_free(sig);
SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len);
SHA1_Final(item->sig.sig, &sha);
free(enc);
SHA1_Init(&sha); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); if (!si) {
SHA1_Update(&sha, item->salt, item->salt_length); ret = KS_STATUS_NO_MEM;
SHA1_Final(item->id.id, &sha); goto done;
} }
si->pool = pool;
return KS_STATUS_SUCCESS; si->v = ben_clone(v);
}
/** si->mutable = KS_TRUE;
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_immutable(ks_dht_storageitem_t *item)
{
SHA_CTX sha;
size_t enc_len = 0;
uint8_t *enc = NULL;
ks_assert(item);
ks_assert(item->v);
item->mutable = KS_FALSE; memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (salt && salt_length > 0) {
memcpy(si->salt, salt, salt_length);
si->salt_length = salt_length;
}
si->seq = sequence;
memcpy(si->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
enc = ben_encode(&enc_len, item->v);
SHA1_Init(&sha); SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len); SHA1_Update(&sha, si->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
SHA1_Final(item->id.id, &sha); if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length);
free(enc); SHA1_Final(si->id.id, &sha);
return KS_STATUS_SUCCESS; done:
if (ret != KS_STATUS_SUCCESS) {
if (si) ks_dht_storageitem_destroy(&si);
*item = NULL;
}
return ret;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item, KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
ks_dht_storageitem_key_t *k,
uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
ks_dht_storageitem_signature_t *signature)
{ {
SHA_CTX sha; ks_dht_storageitem_t *si;
ks_assert(item); ks_assert(item);
ks_assert(item->v); ks_assert(*item);
ks_assert(k);
ks_assert(!(!salt && salt_length > 0));
ks_assert(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
ks_assert(signature);
item->mutable = KS_TRUE; si = *item;
memcpy(item->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); if (si->v) {
if (salt && salt_length > 0) { ben_free(si->v);
memcpy(item->salt, salt, salt_length); si->v = NULL;
item->salt_length = salt_length;
} }
item->seq = sequence; ks_pool_free(si->pool, si);
memcpy(item->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
SHA1_Init(&sha); *item = NULL;
SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (item->salt && item->salt_length > 0) SHA1_Update(&sha, item->salt, item->salt_length);
SHA1_Final(item->id.id, &sha);
return KS_STATUS_SUCCESS;
} }
/* For Emacs: /* For Emacs:
......
#include "ks_dht.h" #include "ks_dht.h"
#include "ks_dht-int.h" #include "ks_dht-int.h"
/** KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
* ks_pool_t *pool,
*/ ks_sockaddr_t *raddr,
KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool) uint32_t transactionid,
ks_dht_message_callback_t callback)
{ {
ks_dht_transaction_t *tran; ks_dht_transaction_t *t;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(transaction); ks_assert(transaction);
ks_assert(pool); ks_assert(pool);
ks_assert(raddr);
*transaction = tran = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t));
tran->pool = pool; if (!t) {
ret = KS_STATUS_NO_MEM;
return KS_STATUS_SUCCESS; goto done;
}
t->pool = pool;
t->raddr = *raddr;
t->transactionid = transactionid;
t->callback = callback;
t->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY;
done:
if (ret != KS_STATUS_SUCCESS) {
if (t) ks_dht_transaction_destroy(&t);
*transaction = NULL;
}
return ret;
} }
/** KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction)
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool)
{ {
ks_assert(transaction); ks_dht_transaction_t *t;
ks_assert(pool);
memset(transaction, 0, sizeof(ks_dht_transaction_t));
transaction->pool = pool;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction)
{
ks_assert(transaction); ks_assert(transaction);
ks_assert(*transaction); ks_assert(*transaction);
ks_dht_transaction_deinit(*transaction); t = *transaction;
ks_pool_free((*transaction)->pool, *transaction);
*transaction = NULL;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction,
ks_sockaddr_t *raddr,
uint32_t transactionid,
ks_dht_message_callback_t callback)
{
ks_assert(transaction);
ks_assert(raddr);
ks_assert(transaction->pool);
ks_assert(callback);
transaction->raddr = *raddr; ks_pool_free(t->pool, t);
transaction->transactionid = transactionid;
transaction->callback = callback;
transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY;
transaction->finished = KS_FALSE;
return KS_STATUS_SUCCESS; *transaction = NULL;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction)
{
ks_assert(transaction);
transaction->raddr = (const ks_sockaddr_t){ 0 };
transaction->transactionid = 0;
transaction->callback = NULL;
transaction->expiration = 0;
transaction->finished = KS_FALSE;
return KS_STATUS_SUCCESS;
} }
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
* mode:c * mode:c
......
...@@ -19,7 +19,7 @@ int main() { ...@@ -19,7 +19,7 @@ int main() {
ks_status_t err; ks_status_t err;
int mask = 0; int mask = 0;
ks_dht_t *dht1 = NULL; ks_dht_t *dht1 = NULL;
ks_dht_t dht2; ks_dht_t *dht2 = NULL;
ks_dht_t *dht3 = NULL; ks_dht_t *dht3 = NULL;
ks_dht_endpoint_t *ep1; ks_dht_endpoint_t *ep1;
ks_dht_endpoint_t *ep2; ks_dht_endpoint_t *ep2;
...@@ -53,21 +53,13 @@ int main() { ...@@ -53,21 +53,13 @@ int main() {
diag("Binding to %s on ipv6\n", v6); diag("Binding to %s on ipv6\n", v6);
} }
err = ks_dht_alloc(&dht1, NULL); err = ks_dht_create(&dht1, NULL, NULL);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht_init(dht1, NULL); err = ks_dht_create(&dht2, NULL, NULL);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
ks_dht_prealloc(&dht2, dht1->pool); err = ks_dht_create(&dht3, NULL, NULL);
err = ks_dht_init(&dht2, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_alloc(&dht3, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_init(dht3, NULL);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
...@@ -85,7 +77,7 @@ int main() { ...@@ -85,7 +77,7 @@ int main() {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET); err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(&dht2, NULL, &addr, &ep2); err = ks_dht_bind(dht2, NULL, &addr, &ep2);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
//raddr2 = addr; //raddr2 = addr;
...@@ -109,7 +101,7 @@ int main() { ...@@ -109,7 +101,7 @@ int main() {
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6); err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(&dht2, NULL, &addr, NULL); err = ks_dht_bind(dht2, NULL, &addr, NULL);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6); err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6);
...@@ -143,24 +135,24 @@ int main() { ...@@ -143,24 +135,24 @@ int main() {
diag("Ping test\n"); diag("Ping test\n");
ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1 ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1 ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1
ok(ks_dhtrt_find_node(dht2.rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
diag("DHT 1\n"); diag("DHT 1\n");
ks_dht_pulse(dht1, 100); ks_dht_pulse(dht1, 100);
diag("DHT 2\n"); diag("DHT 2\n");
ks_dht_pulse(&dht2, 100); ks_dht_pulse(dht2, 100);
} }
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
...@@ -180,35 +172,26 @@ int main() { ...@@ -180,35 +172,26 @@ int main() {
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
diag("DHT 1\n"); diag("DHT 1\n");
ks_dht_pulse(dht1, 100); ks_dht_pulse(dht1, 100);
diag("DHT 2\n"); diag("DHT 2\n");
ks_dht_pulse(&dht2, 100); ks_dht_pulse(dht2, 100);
} }
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
diag("Cleanup\n");
/* Cleanup and shutdown */
err = ks_dht_deinit(dht3);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_free(&dht3); /* Cleanup and shutdown */
ok(err == KS_STATUS_SUCCESS); diag("Cleanup\n");
err = ks_dht_deinit(&dht2); ks_dht_destroy(&dht3);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_deinit(dht1); ks_dht_destroy(&dht2);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_free(&dht1); ks_dht_destroy(&dht1);
ok(err == KS_STATUS_SUCCESS);
err = ks_shutdown(); ks_shutdown();
ok(err == KS_STATUS_SUCCESS);
done_testing(); done_testing();
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论