提交 5f5f7f43 authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Some cleanup and bug fixes in DHT, switched to using hash destructors,…

FS-9775: Some cleanup and bug fixes in DHT, switched to using hash destructors, and added sending of errors to most failed query scenarios
上级 c4ed1300
......@@ -22,11 +22,18 @@ KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
/**
* Called internally to expire various data.
* Handles purging of expired and finished transactions, rotating token secrets, etc.
* Called internally to expire search data.
* Handles completing and purging of finished searches.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht);
/**
* Called internally to process job state machine.
* Handles completing and purging of finished jobs.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
/**
* Called internally to send queued messages.
......@@ -35,6 +42,20 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
*/
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
/**
* Called internally to expire transactions.
* Handles purging of expired and finished transactions.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht);
/**
* Called internally to expire and cycle tokens.
* Handles cycling new secret entropy for token generation.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht);
/**
* Converts a ks_dht_nodeid_t into it's hex string representation.
* @param id pointer to the nodeid
......
......@@ -2,6 +2,12 @@
#include "ks_dht-int.h"
#include "sodium.h"
void ks_dht_endpoint_destructor(void *ptr) { ks_dht_endpoint_destroy((ks_dht_endpoint_t **)&ptr); }
void ks_dht_transaction_destructor(void *ptr) { ks_dht_transaction_destroy((ks_dht_transaction_t **)&ptr); }
void ks_dht_storageitem_destructor(void *ptr) { ks_dht_storageitem_destroy((ks_dht_storageitem_t **)&ptr); }
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
ks_bool_t pool_alloc = !pool;
......@@ -52,7 +58,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/**
* Create the message type registry.
*/
ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_hash_create(&d->registry_type, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_type);
/**
......@@ -65,7 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/**
* Create the message query registry.
*/
ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_hash_create(&d->registry_query, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_query);
/**
......@@ -79,7 +85,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/**
* Create the message error registry.
*/
ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_hash_create(&d->registry_error, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_error);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
......@@ -88,6 +94,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
*/
d->endpoints = NULL;
d->endpoints_length = 0;
d->endpoints_size = 0;
d->endpoints_poll = NULL;
......@@ -96,13 +103,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
* This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
*/
ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
ks_hash_create_ex(&d->endpoints_hash,
2,
NULL,
NULL,
KS_HASH_MODE_CASE_INSENSITIVE,
KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
ks_dht_endpoint_destructor,
d->pool);
ks_assert(d->endpoints_hash);
/**
* Default expirations to not be checked for one pulse.
* Default transactions expirations to not be checked for one pulse.
*/
d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
d->transactions_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
......@@ -132,8 +146,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/**
* Initialize the transaction id mutex, should use atomic increment instead
*/
ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
ks_assert(d->tid_mutex);
ks_mutex_create(&d->transactionid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
ks_assert(d->transactionid_mutex);
/**
* Initialize the first transaction id randomly, this doesn't really matter.
......@@ -144,7 +158,14 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* Create the hash to track pending transactions on queries that are pending responses.
* It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
*/
ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
ks_hash_create_ex(&d->transactions_hash,
16,
NULL,
NULL,
KS_HASH_MODE_INT,
KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
ks_dht_transaction_destructor,
d->pool);
ks_assert(d->transactions_hash);
/**
......@@ -163,12 +184,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
/**
* Create the hash to store arbitrary data for BEP44.
*/
ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_hash_create_ex(&d->storageitems_hash,
16,
NULL,
NULL,
KS_HASH_MODE_ARBITRARY,
KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
ks_dht_storageitem_destructor,
d->pool);
ks_assert(d->storageitems_hash);
/**
......@@ -191,7 +219,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
ks_dht_t *d = NULL;
ks_pool_t *pool = NULL;
ks_bool_t pool_alloc = KS_FALSE;
ks_hash_iterator_t *it = NULL;
ks_assert(dht);
ks_assert(*dht);
......@@ -201,15 +228,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
/**
* Cleanup the storageitems hash and it's contents if it is allocated.
*/
if (d->storageitems_hash) {
for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht_storageitem_t *val = NULL;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_storageitem_destroy(&val);
}
ks_hash_destroy(&d->storageitems_hash);
}
if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
/**
* Zero out the opaque write token variables.
......@@ -229,7 +248,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
/**
* Cleanup the route tables if they are allocated.
* @todo check if endpoints need to be destroyed first to release the readlock on their node
*/
if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
......@@ -238,7 +256,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
* Cleanup the transactions mutex and hash if they are allocated.
*/
d->transactionid_next = 0;
if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex);
if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
/**
......@@ -272,15 +290,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
/**
* Probably don't need this
*/
d->pulse_expirations = 0;
d->transactions_pulse = 0;
/**
* Cleanup any endpoints that have been allocated.
*/
for (int32_t i = 0; i < d->endpoints_size; ++i) {
ks_dht_endpoint_t *ep = d->endpoints[i];
ks_dht_endpoint_destroy(&ep);
}
d->endpoints_length = 0;
d->endpoints_size = 0;
/**
......@@ -294,7 +306,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
/**
* Cleanup the endpoints hash if it is allocated.
* Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
*/
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
......@@ -383,8 +395,9 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_
/**
* Check if the endpoint has already been bound for the address we want to route through.
*/
ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
ks_hash_read_lock(dht->endpoints_hash);
ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED);
ks_hash_read_unlock(dht->endpoints_hash);
/**
* If the endpoint has not been bound, and autorouting is enabled then try to bind the new address.
......@@ -411,38 +424,44 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_lock(dht->registry_type);
ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_unlock(dht->registry_type);
}
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_lock(dht->registry_query);
ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_unlock(dht->registry_query);
}
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_lock(dht->registry_error);
ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
ks_hash_write_unlock(dht->registry_error);
}
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
ks_dht_endpoint_t *ep = NULL;
ks_socket_t sock = KS_SOCK_INVALID;
ks_dht_endpoint_t *ep = NULL;
int32_t epindex = 0;
ks_status_t ret = KS_STATUS_SUCCESS;
......@@ -456,17 +475,21 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
*/
if (endpoint) *endpoint = NULL;
ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
ks_hash_read_unlock(dht->endpoints_hash);
if (ep) {
ks_hash_write_lock(dht->endpoints_hash);
if (ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_UNLOCKED)) {
ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
return KS_STATUS_FAIL;
ret = KS_STATUS_FAIL;
goto done;
}
/**
* Attempt to open a UDP datagram socket for the given address family.
*/
if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
ret = KS_STATUS_FAIL;
goto done;
}
/**
* Set some common socket options for non-blocking IO and forced binding when already in use
......@@ -486,31 +509,36 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ks_assert(ep);
/**
* Resize the endpoints array to take another endpoint pointer.
* Add the new endpoint into the endpoints hash for quick lookups.
*/
epindex = dht->endpoints_size++;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
ks_assert(dht->endpoints);
dht->endpoints[epindex] = ep;
ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
/**
* Add the new endpoint into the endpoints hash for quick lookups.
* @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
* Resize the endpoints array to take another endpoint pointer.
*/
if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
epindex = dht->endpoints_length++;
if (dht->endpoints_length > dht->endpoints_size) {
dht->endpoints_size = dht->endpoints_length;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
ks_assert(dht->endpoints);
/**
* Resize the endpoints_poll array to keep in parallel with endpoints array.
*/
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size);
ks_assert(dht->endpoints_poll);
}
/**
* Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
* Populate the new endpoint data
*/
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size);
ks_assert(dht->endpoints_poll);
dht->endpoints[epindex] = ep;
dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
/**
* If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
*/
......@@ -551,12 +579,13 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
*/
if (ep) {
ks_hash_remove(dht->endpoints_hash, ep->addr.host);
ks_dht_endpoint_destroy(&ep);
dht->endpoints_length--;
}
else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
if (endpoint) *endpoint = NULL;
}
ks_hash_write_unlock(dht->endpoints_hash);
return ret;
}
......@@ -566,13 +595,13 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
ks_sockaddr_t raddr;
ks_assert(dht);
ks_assert(timeout > 0);
ks_assert(timeout >= 0 && timeout <= 1000);
// this should be called with a timeout of less than 1000ms, preferrably around 100ms
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
// @todo confirm how poll/wsapoll react to zero size and NULL array
if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_length; ++i) {
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
raddr = (const ks_sockaddr_t){ 0 };
......@@ -592,49 +621,30 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
}
}
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
ks_dht_pulse_searches(dht);
// @todo pulse_storageitems for keepalive and expiration
// hold keepalive counter on items to determine what to reannounce vs expire
ks_dht_pulse_jobs(dht);
ks_dht_pulse_send(dht);
ks_dht_pulse_expirations(dht);
ks_dht_pulse_transactions(dht);
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
ks_dht_pulse_tokens(dht);
}
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
ks_dht_search_t *searches_first = NULL;
ks_dht_search_t *searches_last = NULL;
ks_time_t now = ks_time_now();
ks_assert(dht);
if (dht->pulse_expirations > now) return;
dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht_transaction_t *value = NULL;
ks_bool_t remove = KS_FALSE;
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
// if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
ks_hash_remove(dht->transactions_hash, (void *)key);
ks_dht_transaction_destroy(&value);
}
}
ks_hash_write_unlock(dht->transactions_hash);
ks_mutex_lock(dht->searches_mutex);
for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
ks_bool_t done = KS_FALSE;
......@@ -665,14 +675,53 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
if (search->callback) search->callback(dht, search);
ks_dht_search_destroy(&search);
}
}
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
{
ks_dht_job_t *first = NULL;
ks_dht_job_t *last = NULL;
ks_assert(dht);
ks_mutex_lock(dht->jobs_mutex);
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
ks_bool_t done = KS_FALSE;
jobn = job->next;
if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
}
if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
job->attempts--;
if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
else done = KS_TRUE;
}
if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
if (done) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
else if (!jobn) {
dht->jobs_last = jobp;
dht->jobs_last->next = NULL;
}
else jobp->next = jobn;
job->next = NULL;
if (last) last = last->next = job;
else first = last = job;
} else jobp = job;
}
ks_mutex_unlock(dht->jobs_mutex);
// @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
jobn = job->next;
// this cannot occur inside of the main loop, may add new jobs invalidating list pointers
if (job->finish_callback) job->finish_callback(dht, job);
ks_dht_job_destroy(&job);
}
}
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
......@@ -698,6 +747,51 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
}
}
KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
ks_time_t now = ks_time_now();
ks_assert(dht);
if (dht->transactions_pulse > now) return;
dht->transactions_pulse = now + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht_transaction_t *value = NULL;
ks_bool_t remove = KS_FALSE;
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
// if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) ks_hash_remove(dht->transactions_hash, (void *)key);
}
ks_hash_write_unlock(dht->transactions_hash);
}
KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht)
{
ks_time_t now = ks_time_now();
ks_assert(dht);
if (dht->tokens_pulse > now) return;
dht->tokens_pulse = now + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC);
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = now + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
}
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
{
char *t = buffer;
......@@ -736,8 +830,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
// @todo change parameters to dereferenced pointer and forward buffer pointer directly
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
......@@ -775,8 +867,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
// @todo change parameters to dereferenced pointer and forward buffer pointer directly
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM;
......@@ -800,8 +890,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
// @todo change parameters to dereferenced pointer and forward buffer pointer directly
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_NO_MEM;
......@@ -825,8 +913,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
// @todo change parameters to dereferenced pointer and forward buffer pointer directly
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
......@@ -1269,10 +1355,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
// @todo atomic increment
ks_mutex_lock(dht->tid_mutex);
ks_mutex_lock(dht->transactionid_mutex);
transactionid = dht->transactionid_next++;
ks_mutex_unlock(dht->tid_mutex);
ks_mutex_unlock(dht->transactionid_mutex);
if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
......@@ -1295,7 +1380,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
*message = msg;
if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
ks_hash_write_lock(dht->transactions_hash);
ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans);
ks_hash_write_unlock(dht->transactions_hash);
if (transaction) *transaction = trans;
......@@ -1327,11 +1414,19 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
*message = NULL;
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
ep,
raddr,
transactionid,
transactionid_length,
202,
"Internal message create error");
goto done;
}
//if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
......@@ -1381,7 +1476,8 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data)
if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_READLOCKED);
ks_hash_read_lock(datagram->dht->registry_type);
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_UNLOCKED);
ks_hash_read_unlock(datagram->dht->registry_type);
if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message->type);
......@@ -1413,13 +1509,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
q = ben_dict_get_by_str(message->data, "q");
if (!q) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
return KS_STATUS_FAIL;
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query missing required key 'q'");
ret = KS_STATUS_FAIL;
goto done;
}
qv = ben_str_val(q);
qv_len = ben_str_len(q);
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query 'q' value is too large");
ret = KS_STATUS_FAIL;
goto done;
}
......@@ -1431,13 +1542,29 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
a = ben_dict_get_by_str(message->data, "a");
if (!a) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query missing required key 'a'");
ret = KS_STATUS_FAIL;
goto done;
}
message->args = a;
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query args missing required key 'id'");
goto done;
}
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
......@@ -1447,13 +1574,41 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
message->raddr.host,
message->raddr.port,
KS_DHTRT_CREATE_PING,
&node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
&node)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal route table create node error");
goto done;
}
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal route table release node error");
goto done;
}
ks_hash_read_lock(dht->registry_query);
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED);
ks_hash_read_unlock(dht->registry_query);
if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
if (!callback) {
ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
204,
"Message query method is not registered");
}
else ret = callback(dht, message);
done:
......@@ -1504,7 +1659,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
transactionid = ntohl(*tid);
ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_lock(dht->transactions_hash);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
......@@ -1582,12 +1739,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
// @todo add lock on node
if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]);
job->search->results[results_index] = node;
job->search->distances[results_index] = distance;
ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
ks_dhtrt_sharelock_node(node);
if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
}
......@@ -1668,10 +1828,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
ks_dhtrt_sharelock_node(n);
if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) {
ks_dhtrt_release_querynodes(&query);
goto done;
}
}
//ks_dhtrt_release_querynodes(&query);
ks_dhtrt_release_querynodes(&query);
ks_mutex_unlock(s->mutex);
locked_search = KS_FALSE;
......@@ -1812,7 +1977,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_lock(dht->transactions_hash);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) {
......@@ -1834,7 +2000,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
transaction->finished = KS_TRUE;
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
ks_hash_read_lock(dht->registry_error);
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED);
ks_hash_read_unlock(dht->registry_error);
if (callback) ret = callback(dht, message);
......@@ -1854,52 +2021,6 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
ks_mutex_unlock(dht->jobs_mutex);
}
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
{
ks_dht_job_t *first = NULL;
ks_dht_job_t *last = NULL;
ks_assert(dht);
ks_mutex_lock(dht->jobs_mutex);
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
ks_bool_t done = KS_FALSE;
jobn = job->next;
if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
}
if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
job->attempts--;
if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
else done = KS_TRUE;
}
if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
if (done) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
else if (!jobn) {
dht->jobs_last = jobp;
dht->jobs_last->next = NULL;
}
else jobp->next = jobn;
job->next = NULL;
if (last) last = last->next = job;
else first = last = job;
} else jobp = job;
}
ks_mutex_unlock(dht->jobs_mutex);
for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
jobn = job->next;
// this cannot occur inside of the main loop, may add new jobs invalidating list pointers
if (job->finish_callback) job->finish_callback(dht, job);
ks_dht_job_destroy(&job);
}
}
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
{
......@@ -2064,11 +2185,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
ks_assert(message);
ks_assert(message->args);
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query findnode args missing required key 'target'");
goto done;
}
want = ben_dict_get_by_str(message->args, "want");
if (want) {
// @todo use ben_list_for_each
size_t want_len = ben_list_len(want);
for (size_t i = 0; i < want_len; ++i) {
struct bencode *iv = ben_list_get(want, i);
......@@ -2087,7 +2216,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
query.max = 8; // @todo should be like KS_DHTRT_BUCKET_SIZE
if (want4) {
query.family = AF_INET;
ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
......@@ -2099,7 +2228,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
&qn->addr,
buffer4,
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
sizeof(buffer4))) != KS_STATUS_SUCCESS) {
ks_dhtrt_release_querynodes(&query);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal compact v4 nodeinfo error");
goto done;
}
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
......@@ -2117,7 +2256,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
&qn->addr,
buffer6,
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
sizeof(buffer6))) != KS_STATUS_SUCCESS) {
ks_dhtrt_release_querynodes(&query);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal compact v6 nodeinfo error");
goto done;
}
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
......@@ -2244,6 +2393,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_dht_storageitem_t *item = NULL;
ks_assert(dht);
ks_assert(job);
......@@ -2256,7 +2406,11 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ks_hash_read_lock(dht->storageitems_hash);
item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
ks_hash_read_unlock(dht->storageitems_hash);
if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
//ks_log(KS_LOG_DEBUG, "Sending message query get\n");
......@@ -2287,7 +2441,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_assert(message);
ks_assert(message->args);
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query get args missing required key 'target'");
goto done;
}
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
......@@ -2317,7 +2480,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
&qn->addr,
buffer4,
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
sizeof(buffer4))) != KS_STATUS_SUCCESS) {
ks_dhtrt_release_querynodes(&query);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal compact v4 nodeinfo error");
goto done;
}
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
......@@ -2335,7 +2508,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
&qn->addr,
buffer6,
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
sizeof(buffer6))) != KS_STATUS_SUCCESS) {
ks_dhtrt_release_querynodes(&query);
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal compact v6 nodeinfo error");
goto done;
}
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
......@@ -2523,22 +2706,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
if (item) ks_dht_storageitem_destroy(&item);
}
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
return ret;
}
// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
......@@ -2620,12 +2800,49 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
ks_assert(message->args);
if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put args missing required key 'token'");
goto done;
}
if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put 'k' is malformed");
goto done;
}
if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put 'sig' is malformed");
goto done;
}
salt = ben_dict_get_by_str(message->args, "salt");
if (salt && ben_str_len(salt) > KS_DHT_STORAGEITEM_SALT_MAX_SIZE) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
207,
"Message query put 'salt' is too large");
goto done;
}
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
......@@ -2635,6 +2852,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
if (seq && (!k || !sig)) {
ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put for mutable data must include both 'k' and 'sig'");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
......@@ -2642,6 +2866,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
v = ben_dict_get_by_str(message->args, "v");
if (!v) {
ks_log(KS_LOG_DEBUG, "Must provide v\n");
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put args missing required key 'v'");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
......@@ -2649,23 +2880,50 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
if (!seq) {
// immutable
if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal storage item target immutable error");
goto done;
}
} else {
// mutable
if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal storage item target mutable error");
goto done;
}
}
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
ks_log(KS_LOG_DEBUG, "Invalid token\n");
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
203,
"Message query put token is invalid");
ret = KS_STATUS_FAIL;
goto done;
}
//ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
if (!seq) {
// immutable
......@@ -2674,27 +2932,61 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
dht->pool,
&target,
v,
KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
KS_TRUE)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal storage item create immutable error");
goto done;
}
} else {
// mutable
if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
206,
"Message query put signature is invalid");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) {
if (cas && olditem->seq != cas_seq) {
// @todo send 301 error instead of the response
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
301,
"Message query put cas mismatch");
goto done;
}
if (olditem->seq > sequence) {
// @todo send 302 error instead of the response
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
302,
"Message query put sequence is less than current");
goto done;
}
if (olditem->seq == sequence) {
if (ben_cmp(olditem->v, v) != 0) {
// @todo send 201? error instead of the response
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
201,
"Message query put sequence is equal to current but values are different");
goto done;
}
} else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
......@@ -2709,9 +3001,18 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
salt,
KS_TRUE,
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
sig)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
202,
"Internal storage item create mutable error");
goto done;
}
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
......@@ -2725,10 +3026,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
ks_q_push(dht->send_q, (void *)response);
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
if (item) ks_dht_storageitem_destroy(&item);
if (item) ks_hash_remove(dht->storageitems_hash, item->id.id);
}
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
return ret;
}
......
......@@ -18,7 +18,6 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
//#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_PULSE_EXPIRATIONS 1
#define KS_DHT_NODEID_SIZE 20
......@@ -30,6 +29,8 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION 10
#define KS_DHT_TRANSACTIONS_PULSE 1
#define KS_DHT_SEARCH_EXPIRATION 10
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
......@@ -40,7 +41,8 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
#define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
#define KS_DHT_TOKENSECRET_EXPIRATION 300
#define KS_DHT_TOKEN_EXPIRATION 300
#define KS_DHT_TOKENS_PULSE 1
#define KS_DHTRT_MAXQUERYSIZE 20
......@@ -257,12 +259,11 @@ struct ks_dht_s {
ks_hash_t *registry_error;
ks_dht_endpoint_t **endpoints;
int32_t endpoints_length;
int32_t endpoints_size;
ks_hash_t *endpoints_hash;
struct pollfd *endpoints_poll;
ks_time_t pulse_expirations;
ks_q_t *send_q;
ks_dht_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
......@@ -272,7 +273,8 @@ struct ks_dht_s {
ks_dht_job_t *jobs_first;
ks_dht_job_t *jobs_last;
ks_mutex_t *tid_mutex;
ks_time_t transactions_pulse;
ks_mutex_t *transactionid_mutex;
volatile uint32_t transactionid_next;
ks_hash_t *transactions_hash;
......@@ -283,6 +285,7 @@ struct ks_dht_s {
ks_dht_search_t *searches_first;
ks_dht_search_t *searches_last;
ks_time_t tokens_pulse;
volatile uint32_t token_secret_current;
volatile uint32_t token_secret_previous;
ks_time_t token_secret_expiration;
......@@ -323,9 +326,8 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t
* @param dht pointer to the dht instance
* @param value string of the type text under the 'y' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message query.
......@@ -333,9 +335,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
* @param dht pointer to the dht instance
* @param value string of the type text under the 'q' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message error.
......@@ -343,9 +344,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
* @param dht pointer to the dht instance
* @param value string of the errorcode under the first item of the 'e' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Bind a local address and port for receiving UDP datagrams.
......
......@@ -23,11 +23,11 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
s->callback = callback;
ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
ks_assert(s->searched);
ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE);
ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
ks_assert(s->searching);
ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论