提交 9e9adb8e authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Incorporated route table to test find_node before adding deep…

FS-9775: Incorporated route table to test find_node before adding deep searching, but routetable bug is currently returning same values for all closest nodes results
上级 e56c3887
...@@ -8,22 +8,30 @@ KS_BEGIN_EXTERN_C ...@@ -8,22 +8,30 @@ KS_BEGIN_EXTERN_C
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address, KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
uint8_t *buffer, uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size);
KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
ks_sockaddr_t *address);
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *address,
uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size);
KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_size_t *buffer_length, ks_size_t *buffer_length,
ks_size_t buffer_size); ks_size_t buffer_size,
KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, ks_dht_nodeid_t *nodeid,
ks_sockaddr_t *address, ks_sockaddr_t *address);
uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size);
/** /**
* *
*/ */
KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht); KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht); KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
...@@ -44,14 +52,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ...@@ -44,14 +52,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message);
/** /**
* *
*/ */
......
...@@ -73,7 +73,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ...@@ -73,7 +73,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
dht->autoroute = KS_FALSE; dht->autoroute = KS_FALSE;
dht->autoroute_port = 0; dht->autoroute_port = 0;
ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht_register_type(dht, "q", ks_dht_process_query); ks_dht_register_type(dht, "q", ks_dht_process_query);
ks_dht_register_type(dht, "r", ks_dht_process_response); ks_dht_register_type(dht, "r", ks_dht_process_response);
...@@ -87,15 +87,17 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ...@@ -87,15 +87,17 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler // @todo register 301 error for internal get/put CAS hash mismatch retry handler
dht->bind_ipv4 = KS_FALSE; dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE; dht->bind_ipv6 = KS_FALSE;
dht->endpoints = NULL; dht->endpoints = NULL;
dht->endpoints_size = 0; dht->endpoints_size = 0;
ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool); ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool);
dht->endpoints_poll = NULL; dht->endpoints_poll = NULL;
dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
ks_q_create(&dht->send_q, dht->pool, 0); ks_q_create(&dht->send_q, dht->pool, 0);
dht->send_q_unsent = NULL; dht->send_q_unsent = NULL;
dht->recv_buffer_length = 0; dht->recv_buffer_length = 0;
...@@ -111,7 +113,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ...@@ -111,7 +113,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE); ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -120,30 +122,32 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ...@@ -120,30 +122,32 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
*/ */
KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
{ {
ks_hash_iterator_t *it;
ks_assert(dht); ks_assert(dht);
// @todo free storage_hash entries
if (dht->storage_hash) { if (dht->storage_hash) {
for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key;
ks_dht_storageitem_t *val;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_storageitem_deinit(val);
ks_dht_storageitem_free(&val);
}
ks_hash_destroy(&dht->storage_hash); ks_hash_destroy(&dht->storage_hash);
dht->storage_hash = NULL;
} }
dht->token_secret_current = 0; dht->token_secret_current = 0;
dht->token_secret_previous = 0; dht->token_secret_previous = 0;
dht->token_secret_expiration = 0; dht->token_secret_expiration = 0;
if (dht->rt_ipv4) {
ks_dhtrt_deinitroute(&dht->rt_ipv4); if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4);
dht->rt_ipv4 = NULL; if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6);
}
if (dht->rt_ipv6) {
ks_dhtrt_deinitroute(&dht->rt_ipv6);
dht->rt_ipv6 = NULL;
}
dht->transactionid_next = 0; dht->transactionid_next = 0;
if (dht->transactions_hash) { if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash);
ks_hash_destroy(&dht->transactions_hash);
dht->transactions_hash = NULL;
}
dht->recv_buffer_length = 0; dht->recv_buffer_length = 0;
if (dht->send_q) { if (dht->send_q) {
ks_dht_message_t *msg; ks_dht_message_t *msg;
while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) { while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) {
...@@ -151,12 +155,14 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) ...@@ -151,12 +155,14 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
ks_dht_message_free(&msg); ks_dht_message_free(&msg);
} }
ks_q_destroy(&dht->send_q); ks_q_destroy(&dht->send_q);
dht->send_q = NULL;
} }
if (dht->send_q_unsent) { if (dht->send_q_unsent) {
ks_dht_message_deinit(dht->send_q_unsent); ks_dht_message_deinit(dht->send_q_unsent);
ks_dht_message_free(&dht->send_q_unsent); ks_dht_message_free(&dht->send_q_unsent);
} }
dht->pulse_expirations = 0;
for (int32_t i = 0; i < dht->endpoints_size; ++i) { for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht_endpoint_t *ep = dht->endpoints[i]; ks_dht_endpoint_t *ep = dht->endpoints[i];
ks_dht_endpoint_deinit(ep); ks_dht_endpoint_deinit(ep);
...@@ -167,33 +173,23 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) ...@@ -167,33 +173,23 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
ks_pool_free(dht->pool, dht->endpoints); ks_pool_free(dht->pool, dht->endpoints);
dht->endpoints = NULL; dht->endpoints = NULL;
} }
if (dht->endpoints_poll) { if (dht->endpoints_poll) {
ks_pool_free(dht->pool, dht->endpoints_poll); ks_pool_free(dht->pool, dht->endpoints_poll);
dht->endpoints_poll = NULL; dht->endpoints_poll = NULL;
} }
if (dht->endpoints_hash) { if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash);
ks_hash_destroy(&dht->endpoints_hash);
dht->endpoints_hash = NULL;
}
dht->bind_ipv4 = KS_FALSE; dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE; dht->bind_ipv6 = KS_FALSE;
if (dht->registry_type) { if (dht->registry_type) ks_hash_destroy(&dht->registry_type);
ks_hash_destroy(&dht->registry_type); if (dht->registry_query) ks_hash_destroy(&dht->registry_query);
dht->registry_type = NULL; if (dht->registry_error) ks_hash_destroy(&dht->registry_error);
}
if (dht->registry_query) {
ks_hash_destroy(&dht->registry_query);
dht->registry_query = NULL;
}
if (dht->registry_error) {
ks_hash_destroy(&dht->registry_error);
dht->registry_error = NULL;
}
dht->autoroute = KS_FALSE; dht->autoroute = KS_FALSE;
dht->autoroute_port = 0; dht->autoroute_port = 0;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -204,15 +200,12 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_ ...@@ -204,15 +200,12 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_
{ {
ks_assert(dht); ks_assert(dht);
if (!autoroute) { if (!autoroute) port = 0;
port = 0; else if (port <= 0) port = KS_DHT_DEFAULT_PORT;
} else if (port <= 0) {
port = KS_DHT_DEFAULT_PORT;
}
dht->autoroute = autoroute; dht->autoroute = autoroute;
dht->autoroute_port = port; dht->autoroute_port = port;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -230,23 +223,23 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *rad ...@@ -230,23 +223,23 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *rad
ks_assert(endpoint); ks_assert(endpoint);
*endpoint = NULL; *endpoint = NULL;
ks_ip_route(ip, sizeof(ip), raddr->host); ks_ip_route(ip, sizeof(ip), raddr->host);
// @todo readlock hash ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { ks_hash_read_unlock(dht->endpoints_hash);
if (!ep && dht->autoroute) {
ks_sockaddr_t addr; ks_sockaddr_t addr;
ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) { if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
} }
if (!ep) { if (!ep) {
ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -258,7 +251,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k ...@@ -258,7 +251,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
ks_assert(dht); ks_assert(dht);
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
// @todo writelock registry
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
} }
...@@ -270,7 +263,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ...@@ -270,7 +263,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
ks_assert(dht); ks_assert(dht);
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
// @todo writelock registry
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
} }
...@@ -282,7 +275,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ...@@ -282,7 +275,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
ks_assert(dht); ks_assert(dht);
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
// @todo writelock registry
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
} }
...@@ -294,34 +287,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ...@@ -294,34 +287,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ks_dht_endpoint_t *ep; ks_dht_endpoint_t *ep;
ks_socket_t sock; ks_socket_t sock;
int32_t epindex; int32_t epindex;
ks_assert(dht); ks_assert(dht);
ks_assert(addr); ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
ks_assert(addr->port); ks_assert(addr->port);
if (endpoint) { if (endpoint) *endpoint = NULL;
*endpoint = NULL;
}
dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv4 |= addr->family == AF_INET;
dht->bind_ipv6 |= addr->family == AF_INET6; dht->bind_ipv6 |= addr->family == AF_INET6;
if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) { if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
// @todo shouldn't ks_addr_bind take a const addr *? // @todo shouldn't ks_addr_bind take a const addr *?
if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) { if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock); ks_socket_close(&sock);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) { if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock); ks_socket_close(&sock);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) { if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) {
ks_dht_endpoint_free(&ep); ks_dht_endpoint_free(&ep);
ks_socket_close(&sock); ks_socket_close(&sock);
...@@ -331,35 +320,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ...@@ -331,35 +320,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE); ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE); ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
epindex = dht->endpoints_size++; epindex = dht->endpoints_size++;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints, (void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
dht->endpoints[epindex] = ep; dht->endpoints[epindex] = ep;
ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep); ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll, (void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size); sizeof(struct pollfd) * dht->endpoints_size);
dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR; dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
// @todo initialize or add local nodeid to appropriate route table
if (ep->addr.family == AF_INET) { if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4) { if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool); ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
}
} else { } else {
if (!dht->rt_ipv6) { if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
}
}
if (endpoint) {
*endpoint = ep;
} }
if (endpoint) *endpoint = ep;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -369,7 +353,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ...@@ -369,7 +353,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{ {
int32_t result; int32_t result;
ks_assert(dht); ks_assert(dht);
ks_assert (timeout >= 0); ks_assert (timeout >= 0);
...@@ -378,14 +362,14 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) ...@@ -378,14 +362,14 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
if (timeout == 0) { if (timeout == 0) {
// @todo deal with default timeout, should return quickly but not hog the CPU polling // @todo deal with default timeout, should return quickly but not hog the CPU polling
} }
result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
if (result > 0) { if (result > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) { for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (dht->endpoints_poll[i].revents & POLLIN) { if (dht->endpoints_poll[i].revents & POLLIN) {
ks_sockaddr_t raddr = KS_SA_INIT; ks_sockaddr_t raddr = KS_SA_INIT;
dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE; dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
raddr.family = dht->endpoints[i]->addr.family; raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
// @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
...@@ -395,69 +379,180 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) ...@@ -395,69 +379,180 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
} }
} }
ks_dht_idle(dht); ks_dht_pulse_expirations(dht);
ks_dht_pulse_send(dht);
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address, KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
uint8_t *buffer, {
ks_size_t *buffer_length, ks_hash_iterator_t *it = NULL;
ks_size_t buffer_size) ks_time_t now = ks_time_now_sec();
ks_assert(dht);
if (dht->pulse_expirations <= now) {
dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS;
}
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht_transaction_t *value = NULL;
ks_bool_t remove = KS_FALSE;
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
ks_hash_remove(dht->transactions_hash, (char *)key);
ks_pool_free(value->pool, value);
}
}
ks_hash_write_unlock(dht->transactions_hash);
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
}
/**
*
*/
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
{
ks_dht_message_t *message;
ks_bool_t bail = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
while (!bail) {
message = NULL;
if (dht->send_q_unsent) {
message = dht->send_q_unsent;
dht->send_q_unsent = NULL;
}
if (!message) bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
if (!bail) {
bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
if (ret == KS_STATUS_BREAK) dht->send_q_unsent = message;
else if (ret == KS_STATUS_SUCCESS) {
ks_dht_message_deinit(message);
ks_dht_message_free(&message);
}
}
}
}
/**
*
*/
static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
{
char *t = buffer;
ks_assert(id);
ks_assert(buffer);
memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
return buffer;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size)
{ {
ks_size_t required = sizeof(uint16_t); ks_size_t addr_len;
const void *paddr = NULL;
uint16_t port = 0; uint16_t port = 0;
ks_assert(address); ks_assert(address);
ks_assert(buffer); ks_assert(buffer);
ks_assert(buffer_length); ks_assert(buffer_length);
ks_assert(buffer_size); ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6); ks_assert(address->family == AF_INET || address->family == AF_INET6);
if (address->family == AF_INET) { addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
required += sizeof(uint32_t);
} else { if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
required += 8 * sizeof(uint16_t);
}
if (*buffer_length + required > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
if (address->family == AF_INET) { if (address->family == AF_INET) {
uint32_t *paddr = (uint32_t *)&address->v.v4.sin_addr; paddr = &address->v.v4.sin_addr; // already network byte order
uint32_t addr = htonl(*paddr); port = address->v.v4.sin_port; // already network byte order
port = htons(address->v.v4.sin_port);
memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint32_t));
*buffer_length += sizeof(uint32_t);
} else { } else {
uint16_t *paddr = (uint16_t *)&address->v.v6.sin6_addr; paddr = &address->v.v6.sin6_addr; // already network byte order
port = htons(address->v.v6.sin6_port); port = address->v.v6.sin6_port; // already network byte order
for (int32_t i = 0; i < 8; ++i) {
uint16_t addr = htons(paddr[i]);
memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint16_t));
*buffer_length += sizeof(uint16_t);
}
} }
memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
*buffer_length += addr_len;
memcpy(buffer + (*buffer_length), (void *)&port, sizeof(uint16_t)); memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t));
*buffer_length += sizeof(uint16_t); *buffer_length += sizeof(uint16_t);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
ks_sockaddr_t *address)
{
ks_size_t addr_len;
const void *paddr = NULL;
uint16_t port = 0;
ks_assert(buffer);
ks_assert(buffer_length);
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL;
paddr = buffer + *buffer_length;
*buffer_length += addr_len;
port = *((uint16_t *)(buffer + *buffer_length));
*buffer_length += sizeof(uint16_t);
// @todo ks_addr_set_raw second parameter should be const?
ks_addr_set_raw(address, (void *)paddr, port, address->family);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
ks_sockaddr_t *address, const ks_sockaddr_t *address,
uint8_t *buffer, uint8_t *buffer,
ks_size_t *buffer_length, ks_size_t *buffer_length,
ks_size_t buffer_size) ks_size_t buffer_size)
{ {
ks_assert(address); ks_assert(address);
ks_assert(buffer); ks_assert(buffer);
...@@ -473,7 +568,30 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, ...@@ -473,7 +568,30 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid,
memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE); memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
*buffer_length += KS_DHT_NODEID_SIZE; *buffer_length += KS_DHT_NODEID_SIZE;
return ks_dht_utility_compact_address(address, buffer, buffer_length, buffer_size); return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
ks_dht_nodeid_t *nodeid,
ks_sockaddr_t *address)
{
ks_assert(buffer);
ks_assert(buffer_length);
ks_assert(nodeid);
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL;
memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
*buffer_length += KS_DHT_NODEID_SIZE;
return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
} }
/** /**
...@@ -490,13 +608,13 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons ...@@ -490,13 +608,13 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons
ks_assert(nodeid); ks_assert(nodeid);
*nodeid = NULL; *nodeid = NULL;
id = ben_dict_get_by_str(args, key); id = ben_dict_get_by_str(args, key);
if (!id) { if (!id) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
idv = ben_str_val(id); idv = ben_str_val(id);
idv_len = ben_str_len(id); idv_len = ben_str_len(id);
if (idv_len != KS_DHT_NODEID_SIZE) { if (idv_len != KS_DHT_NODEID_SIZE) {
...@@ -523,7 +641,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const ...@@ -523,7 +641,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
ks_assert(token); ks_assert(token);
*token = NULL; *token = NULL;
tok = ben_dict_get_by_str(args, key); tok = ben_dict_get_by_str(args, key);
if (!tok) { if (!tok) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
...@@ -558,7 +676,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra ...@@ -558,7 +676,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
secret = htonl(secret); secret = htonl(secret);
port = htons(raddr->port); port = htons(raddr->port);
SHA1_Init(&sha); SHA1_Init(&sha);
SHA1_Update(&sha, &secret, sizeof(uint32_t)); SHA1_Update(&sha, &secret, sizeof(uint32_t));
SHA1_Update(&sha, raddr->host, strlen(raddr->host)); SHA1_Update(&sha, raddr->host, strlen(raddr->host));
...@@ -578,9 +696,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k ...@@ -578,9 +696,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok); ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok);
if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) { if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE;
return KS_TRUE;
}
ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok); ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok);
...@@ -590,212 +706,65 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k ...@@ -590,212 +706,65 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
/** /**
* *
*/ */
KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht) KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{ {
ks_assert(dht); // @todo calculate max IPV6 payload size?
char buf[1000];
ks_dht_idle_expirations(dht); ks_size_t buf_len;
ks_dht_idle_send(dht);
}
/**
*
*/
KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
ks_time_t now = ks_time_now_sec();
ks_assert(dht); ks_assert(dht);
ks_assert(message);
ks_assert(message->endpoint);
ks_assert(message->data);
// @todo add delay between checking expirations, every 10 seconds? // @todo blacklist check
ks_hash_write_lock(dht->transactions_hash); buf_len = ben_encode2(buf, sizeof(buf), message->data);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht_transaction_t *value = NULL;
ks_bool_t remove = KS_FALSE;
ks_hash_this(it, &key, NULL, (void **)&value); ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
if (value->finished) { ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
remove = KS_TRUE;
} else if (value->expiration <= now) {
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
ks_hash_remove(dht->transactions_hash, (char *)key);
ks_pool_free(value->pool, value);
}
}
ks_hash_write_unlock(dht->transactions_hash);
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
} }
/** /**
* *
*/ */
KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht) KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
const char *query,
ks_dht_message_callback_t callback,
ks_dht_message_t **message,
struct bencode **args)
{ {
ks_dht_message_t *message; uint32_t transactionid;
ks_bool_t bail = KS_FALSE; ks_dht_transaction_t *trans = NULL;
ks_status_t ret = KS_STATUS_SUCCESS; ks_dht_message_t *msg = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht); ks_assert(dht);
ks_assert(raddr);
while (!bail) { ks_assert(query);
message = NULL; ks_assert(callback);
if (dht->send_q_unsent) { ks_assert(message);
message = dht->send_q_unsent;
dht->send_q_unsent = NULL;
}
if (!message) {
bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
}
if (!bail) {
bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
if (ret == KS_STATUS_BREAK) {
dht->send_q_unsent = message;
} else if (ret == KS_STATUS_SUCCESS) {
ks_dht_message_deinit(message);
ks_dht_message_free(&message);
}
}
}
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
// @todo calculate max IPV6 payload size?
char buf[1000];
ks_size_t buf_len;
ks_assert(dht);
ks_assert(message);
ks_assert(message->endpoint);
ks_assert(message->data);
// @todo blacklist check
buf_len = ben_encode2(buf, sizeof(buf), message->data);
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr)
{
ks_dht_message_t *error = NULL;
struct bencode *e = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
goto done;
}
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ks_q_push(dht->send_q, (void *)error);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS && error) {
ks_dht_message_deinit(error);
ks_dht_message_free(&error);
}
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
const char *query,
ks_dht_message_callback_t callback,
ks_dht_message_t **message,
struct bencode **args)
{
uint32_t transactionid;
ks_dht_transaction_t *trans = NULL;
ks_dht_message_t *msg = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(query);
ks_assert(callback);
ks_assert(message);
*message = NULL; *message = NULL;
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
// @todo atomic increment or mutex // @todo atomic increment or mutex
transactionid = dht->transactionid_next++; transactionid = dht->transactionid_next++;
if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) { if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done;
goto done;
}
if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) { if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done;
goto done;
}
if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
goto done;
}
if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
goto done;
}
if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) { if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done;
goto done;
}
*message = msg; *message = msg;
...@@ -836,25 +805,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ...@@ -836,25 +805,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
ks_assert(raddr); ks_assert(raddr);
ks_assert(transactionid); ks_assert(transactionid);
ks_assert(message); ks_assert(message);
*message = NULL; *message = NULL;
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
goto done;
}
if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
goto done;
} if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) {
goto done;
}
*message = msg; *message = msg;
ret = KS_STATUS_SUCCESS; ret = KS_STATUS_SUCCESS;
...@@ -868,80 +829,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ...@@ -868,80 +829,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
return ret; return ret;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/** /**
* *
*/ */
...@@ -961,29 +848,22 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_ ...@@ -961,29 +848,22 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_
} }
// @todo blacklist check for bad actor nodes // @todo blacklist check for bad actor nodes
if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) { if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
goto done;
}
// @todo readlocking registry for calling from threadpool if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) goto done;
if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_READLOCKED);
} else { ks_hash_read_unlock(dht->registry_type);
ret = callback(dht, &message);
} if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
else ret = callback(dht, &message);
done: done:
ks_dht_message_deinit(&message); ks_dht_message_deinit(&message);
return ret; return ret;
} }
...@@ -1009,7 +889,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1009,7 +889,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n"); ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
qv = ben_str_val(q); qv = ben_str_val(q);
qv_len = ben_str_len(q); qv_len = ben_str_len(q);
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
...@@ -1030,12 +910,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1030,12 +910,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
message->args = a; message->args = a;
// @todo readlocking registry for calling from threadpool callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { ks_hash_read_unlock(dht->registry_query);
ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
} else { if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
ret = callback(dht, message); else ret = callback(dht, message);
}
return ret; return ret;
} }
...@@ -1069,10 +948,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ...@@ -1069,10 +948,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash); ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) { if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG, ks_log(KS_LOG_DEBUG,
"Message response rejected due to spoofing from %s %d, expected %s %d\n", "Message response rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host, message->raddr.host,
...@@ -1087,6 +965,93 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ...@@ -1087,6 +965,93 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
return ret; return ret;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback)
{
ks_assert(dht);
ks_assert(id);
// @todo check hash for id to see if search already exists
// @todo if search does not exist, create new search and store in hash by id
// @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results
// @todo if search existed already and is already running then bail out and let it run
// @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes
// @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
// @todo upon receiving response to find_node, check for an existing search by the id
// @todo keep track of the closest K(8) nodes found to the id
// @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes
// @todo if search queue is empty, call callbacks
// @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
// @todo during pulse iterate searches and check for last popped timeout where find_node received no reply
// @todo pop a pending find_node call, or call callbacks if empty
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr)
{
ks_dht_message_t *error = NULL;
struct bencode *e = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done;
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ks_q_push(dht->send_q, (void *)error);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS && error) {
ks_dht_message_deinit(error);
ks_dht_message_free(&error);
}
return ret;
}
/** /**
* *
*/ */
...@@ -1122,7 +1087,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1122,7 +1087,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
} }
errorcode = ben_int_val(ec); errorcode = ben_int_val(ec);
et = ben_str_val(es); et = ben_str_val(es);
memcpy(error, et, es_len); memcpy(error, et, es_len);
error[es_len] = '\0'; error[es_len] = '\0';
// @todo end of ks_dht_message_parse_error // @todo end of ks_dht_message_parse_error
...@@ -1134,7 +1099,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1134,7 +1099,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash); ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) { if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
...@@ -1148,10 +1113,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1148,10 +1113,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
ks_dht_message_callback_t callback; ks_dht_message_callback_t callback;
transaction->finished = KS_TRUE; transaction->finished = KS_TRUE;
// @todo readlock on registry callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { ks_hash_read_unlock(dht->registry_error);
ret = callback(dht, message);
} else { if (callback) ret = callback(dht, message);
else {
ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ret = KS_STATUS_SUCCESS; ret = KS_STATUS_SUCCESS;
} }
...@@ -1160,6 +1126,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1160,6 +1126,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
return ret; return ret;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/** /**
* *
*/ */
...@@ -1168,16 +1156,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ...@@ -1168,16 +1156,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
ks_dht_nodeid_t *id; ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL; ks_dht_message_t *response = NULL;
struct bencode *r = NULL; struct bencode *r = NULL;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
// @todo add/touch bucket entry for remote node routetable = message->endpoint->node->table;
// @todo touch here, or only create if not exists?
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
...@@ -1199,6 +1192,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ...@@ -1199,6 +1192,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_assert(dht);
ks_assert(message);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/** /**
* *
*/ */
...@@ -1215,30 +1257,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ...@@ -1215,30 +1257,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
uint8_t buffer6[1000]; uint8_t buffer6[1000];
ks_size_t buffer4_length = 0; ks_size_t buffer4_length = 0;
ks_size_t buffer6_length = 0; ks_size_t buffer6_length = 0;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_dhtrt_querynodes_t query;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) { if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
}
want = ben_dict_get_by_str(message->args, "want"); want = ben_dict_get_by_str(message->args, "want");
if (want) { if (want) {
size_t want_len = ben_list_len(want); size_t want_len = ben_list_len(want);
for (size_t i = 0; i < want_len; ++i) { for (size_t i = 0; i < want_len; ++i) {
struct bencode *iv = ben_list_get(want, i); struct bencode *iv = ben_list_get(want, i);
if (!ben_cmp_with_str(iv, "n4")) { if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE;
want4 = KS_TRUE; if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE;
}
if (!ben_cmp_with_str(iv, "n6")) {
want6 = KS_TRUE;
}
} }
} }
...@@ -1247,27 +1285,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ...@@ -1247,27 +1285,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
want6 = message->raddr.family == AF_INET6; want6 = message->raddr.family == AF_INET6;
} }
// @todo add/touch bucket entry for remote node routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
query.nodeid = *target;
query.type = ks_dht_remote_t;
query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
if (want4) { if (want4) {
// @todo get closest nodes to target from ipv4 route table query.family = AF_INET;
// @todo compact nodes into buffer4 ks_dhtrt_findclosest_nodes(routetable, &query);
for (int32_t i = 0; i < query.count; ++i) {
if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
&query.nodes[i]->addr,
buffer4,
&buffer4_length,
sizeof(buffer4)) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
query.nodes[i]->addr.host,
query.nodes[i]->addr.port);
}
} }
if (want6) { if (want6) {
// @todo get closest nodes to target from ipv6 route table query.family = AF_INET6;
// @todo compact nodes into buffer6 ks_dhtrt_findclosest_nodes(routetable, &query);
}
for (int32_t i = 0; i < query.count; ++i) {
// @todo remove this, testing only if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
if (ks_dht_utility_compact_node(id, &query.nodes[i]->addr,
&message->raddr, buffer6,
message->raddr.family == AF_INET ? buffer4 : buffer6, &buffer6_length,
message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length, sizeof(buffer6)) != KS_STATUS_SUCCESS) {
message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL;
return KS_STATUS_FAIL; }
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
query.nodes[i]->addr.host,
query.nodes[i]->addr.port);
}
} }
if (ks_dht_setup_response(dht, if (ks_dht_setup_response(dht,
...@@ -1281,12 +1347,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ...@@ -1281,12 +1347,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
} }
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
if (want4) { if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
}
if (want6) {
ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
}
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response); ks_q_push(dht->send_q, (void *)response);
...@@ -1294,6 +1356,112 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ...@@ -1294,6 +1356,112 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
struct bencode *n;
const uint8_t *nodes = NULL;
const uint8_t *nodes6 = NULL;
size_t nodes_size = 0;
size_t nodes6_size = 0;
size_t nodes_len = 0;
size_t nodes6_len = 0;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(dht);
ks_assert(message);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
n = ben_dict_get_by_str(message->args, "nodes");
if (n) {
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
n = ben_dict_get_by_str(message->args, "nodes6");
if (n) {
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
}
routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
ks_sockaddr_t addr;
addr.family = AF_INET;
if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
addr.host,
addr.port);
if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(dht->rt_ipv4, nid, ks_dht_remote_t, addr.host, addr.port, &node);
}
}
while (nodes6_len < nodes6_size) {
ks_dht_nodeid_t nid;
ks_sockaddr_t addr;
addr.family = AF_INET6;
if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
addr.host,
addr.port);
if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(dht->rt_ipv6, nid, ks_dht_remote_t, addr.host, addr.port, &node);
}
}
// @todo repeat above for ipv6 table
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/** /**
* *
*/ */
...@@ -1308,30 +1476,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ...@@ -1308,30 +1476,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_dht_storageitem_t *item = NULL; ks_dht_storageitem_t *item = NULL;
ks_dht_message_t *response = NULL; ks_dht_message_t *response = NULL;
struct bencode *r = NULL; struct bencode *r = NULL;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_FAIL;
} if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
seq = ben_dict_get_by_str(message->args, "seq"); seq = ben_dict_get_by_str(message->args, "seq");
if (seq) { if (seq) sequence = ben_int_val(seq);
sequence = ben_int_val(seq);
}
// @todo add/touch bucket entry for remote node routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED); item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
ks_hash_read_unlock(dht->storage_hash); ks_hash_read_unlock(dht->storage_hash);
...@@ -1341,7 +1509,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ...@@ -1341,7 +1509,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
// @todo find closest ipv4 and ipv6 nodes to target // @todo find closest ipv4 and ipv6 nodes to target
// @todo compact ipv4 and ipv6 nodes into separate buffers // @todo compact ipv4 and ipv6 nodes into separate buffers
if (ks_dht_setup_response(dht, if (ks_dht_setup_response(dht,
message->endpoint, message->endpoint,
&message->raddr, &message->raddr,
...@@ -1362,9 +1530,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ...@@ -1362,9 +1530,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
} }
ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq)); ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
} }
if (!sequence_snuffed) { if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
}
} }
// @todo nodes, nodes6 // @todo nodes, nodes6
...@@ -1374,19 +1540,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ...@@ -1374,19 +1540,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
ks_dht_token_t *token;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_assert(dht);
ks_assert(message);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t
routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
// @todo add/touch bucket entries for other nodes/nodes6 returned
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
return KS_STATUS_SUCCESS;
}
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{ {
ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL; ks_dht_message_t *response = NULL;
struct bencode *r = NULL; struct bencode *r = NULL;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
// @todo add/touch bucket entry for remote node if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
routetable = message->endpoint->node->table;
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
...@@ -1408,63 +1617,27 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ...@@ -1408,63 +1617,27 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
{ {
ks_assert(dht); ks_dht_nodeid_t *id;
ks_assert(message); ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
// @todo add/touch bucket entry for remote node
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
// @todo add/touch bucket entry for remote node and other nodes returned if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
return KS_STATUS_SUCCESS;
}
/** routetable = message->endpoint->node->table;
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
ks_dht_token_t *token;
ks_assert(dht);
ks_assert(message);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
return KS_STATUS_FAIL;
} }
if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
// @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t
// @todo add/touch bucket entry for remote node and other nodes returned
ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
......
...@@ -10,6 +10,7 @@ KS_BEGIN_EXTERN_C ...@@ -10,6 +10,7 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_DEFAULT_PORT 5309 #define KS_DHT_DEFAULT_PORT 5309
#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_PULSE_EXPIRATIONS 10
#define KS_DHT_NODEID_SIZE 20 #define KS_DHT_NODEID_SIZE 20
...@@ -19,6 +20,7 @@ KS_BEGIN_EXTERN_C ...@@ -19,6 +20,7 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 #define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
#define KS_DHT_SEARCH_EXPIRATION 10
#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
...@@ -106,6 +108,7 @@ struct ks_dht_endpoint_s { ...@@ -106,6 +108,7 @@ struct ks_dht_endpoint_s {
ks_dht_nodeid_t nodeid; ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr; ks_sockaddr_t addr;
ks_socket_t sock; ks_socket_t sock;
ks_dht_node_t *node;
}; };
struct ks_dht_transaction_s { struct ks_dht_transaction_s {
...@@ -151,6 +154,8 @@ struct ks_dht_s { ...@@ -151,6 +154,8 @@ struct ks_dht_s {
ks_hash_t *endpoints_hash; ks_hash_t *endpoints_hash;
struct pollfd *endpoints_poll; struct pollfd *endpoints_poll;
ks_time_t pulse_expirations;
ks_q_t *send_q; ks_q_t *send_q;
ks_dht_message_t *send_q_unsent; ks_dht_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE]; uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE];
......
...@@ -62,11 +62,8 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const ...@@ -62,11 +62,8 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const
ks_assert(addr); ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
if (!nodeid) { if (!nodeid) randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE);
randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE); else memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
} else {
memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
}
endpoint->addr = *addr; endpoint->addr = *addr;
endpoint->sock = sock; endpoint->sock = sock;
...@@ -81,10 +78,9 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint) ...@@ -81,10 +78,9 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint)
{ {
ks_assert(endpoint); ks_assert(endpoint);
if (endpoint->sock != KS_SOCK_INVALID) { endpoint->node = NULL;
ks_socket_close(&endpoint->sock); if (endpoint->sock != KS_SOCK_INVALID) ks_socket_close(&endpoint->sock);
endpoint->sock = KS_SOCK_INVALID; endpoint->addr = (const ks_sockaddr_t){ 0 };
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
......
...@@ -59,13 +59,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_en ...@@ -59,13 +59,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_en
message->endpoint = ep; message->endpoint = ep;
message->raddr = *raddr; message->raddr = *raddr;
message->data = NULL; if (alloc_data) message->data = ben_dict();
message->args = NULL;
message->transactionid_length = 0;
message->type[0] = '\0';
if (alloc_data) {
message->data = ben_dict();
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -173,7 +167,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, ...@@ -173,7 +167,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
ks_assert(query); ks_assert(query);
tid = htonl(transactionid); tid = htonl(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t))); ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t)));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1)); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1));
ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query))); ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
...@@ -182,9 +176,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, ...@@ -182,9 +176,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
a = ben_dict(); a = ben_dict();
ben_dict_set(message->data, ben_blob("a", 1), a); ben_dict_set(message->data, ben_blob("a", 1), a);
if (args) { if (args) *args = a;
*args = a;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -198,20 +190,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, ...@@ -198,20 +190,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
struct bencode **args) struct bencode **args)
{ {
struct bencode *r; struct bencode *r;
ks_assert(message); ks_assert(message);
ks_assert(transactionid); ks_assert(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1)); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1));
// @note r joins message->data and will be freed with it // @note r joins message->data and will be freed with it
r = ben_dict(); r = ben_dict();
ben_dict_set(message->data, ben_blob("r", 1), r); ben_dict_set(message->data, ben_blob("r", 1), r);
if (args) { if (args) *args = r;
*args = r;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -225,20 +215,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, ...@@ -225,20 +215,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
struct bencode **args) struct bencode **args)
{ {
struct bencode *e; struct bencode *e;
ks_assert(message); ks_assert(message);
ks_assert(transactionid); ks_assert(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1)); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
// @note r joins message->data and will be freed with it // @note r joins message->data and will be freed with it
e = ben_list(); e = ben_list();
ben_dict_set(message->data, ben_blob("e", 1), e); ben_dict_set(message->data, ben_blob("e", 1), e);
if (args) { if (args) *args = e;
*args = e;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
......
...@@ -185,9 +185,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item, ...@@ -185,9 +185,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item,
SHA1_Init(&sha); SHA1_Init(&sha);
SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (item->salt && item->salt_length > 0) { if (item->salt && item->salt_length > 0) SHA1_Update(&sha, item->salt, item->salt_length);
SHA1_Update(&sha, item->salt, item->salt_length);
}
SHA1_Final(item->id.id, &sha); SHA1_Final(item->id.id, &sha);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
......
...@@ -15,18 +15,22 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message) ...@@ -15,18 +15,22 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
} }
int main() { int main() {
ks_size_t buflen; //ks_size_t buflen;
ks_status_t err; ks_status_t err;
int mask = 0; int mask = 0;
ks_dht_t *dht1 = NULL; ks_dht_t *dht1 = NULL;
ks_dht_t dht2; ks_dht_t dht2;
ks_dht_t *dht3 = NULL;
ks_dht_endpoint_t *ep1; ks_dht_endpoint_t *ep1;
ks_dht_endpoint_t *ep2; ks_dht_endpoint_t *ep2;
ks_dht_endpoint_t *ep3;
ks_bool_t have_v4, have_v6; ks_bool_t have_v4, have_v6;
char v4[48] = {0}, v6[48] = {0}; char v4[48] = {0}, v6[48] = {0};
ks_sockaddr_t addr; ks_sockaddr_t addr;
ks_sockaddr_t raddr; ks_sockaddr_t raddr1;
//ks_sockaddr_t raddr2;
//ks_sockaddr_t raddr3;
err = ks_init(); err = ks_init();
ok(!err); ok(!err);
...@@ -61,6 +65,13 @@ int main() { ...@@ -61,6 +65,13 @@ int main() {
err = ks_dht_init(&dht2); err = ks_dht_init(&dht2);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht_alloc(&dht3, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_init(dht3);
ok(err == KS_STATUS_SUCCESS);
ks_dht_register_type(dht1, "z", dht_z_callback); ks_dht_register_type(dht1, "z", dht_z_callback);
if (have_v4) { if (have_v4) {
...@@ -70,13 +81,23 @@ int main() { ...@@ -70,13 +81,23 @@ int main() {
err = ks_dht_bind(dht1, NULL, &addr, &ep1); err = ks_dht_bind(dht1, NULL, &addr, &ep1);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
raddr1 = addr;
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET); err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(&dht2, NULL, &addr, &ep2); err = ks_dht_bind(&dht2, NULL, &addr, &ep2);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
raddr = addr; //raddr2 = addr;
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 2, AF_INET);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht3, NULL, &addr, &ep3);
ok(err == KS_STATUS_SUCCESS);
//raddr3 = addr;
} }
if (have_v6) { if (have_v6) {
...@@ -91,20 +112,26 @@ int main() { ...@@ -91,20 +112,26 @@ int main() {
err = ks_dht_bind(&dht2, NULL, &addr, NULL); err = ks_dht_bind(&dht2, NULL, &addr, NULL);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht3, NULL, &addr, NULL);
ok(err == KS_STATUS_SUCCESS);
} }
diag("Custom type tests\n"); //diag("Custom type tests\n");
buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
dht1->recv_buffer_length = buflen; //dht1->recv_buffer_length = buflen;
err = ks_dht_process(dht1, ep1, &raddr); //err = ks_dht_process(dht1, ep1, &raddr);
ok(err == KS_STATUS_SUCCESS); //ok(err == KS_STATUS_SUCCESS);
ks_dht_pulse(dht1, 100); //ks_dht_pulse(dht1, 100);
ks_dht_pulse(&dht2, 100); //ks_dht_pulse(&dht2, 100);
//buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
...@@ -115,20 +142,39 @@ int main() { ...@@ -115,20 +142,39 @@ int main() {
//ok(err == KS_STATUS_SUCCESS); //ok(err == KS_STATUS_SUCCESS);
diag("Ping tests\n"); diag("Ping test\n");
ks_dht_send_ping(dht1, ep1, &raddr); ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1
ks_dht_pulse(dht1, 100); ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1
ks_dht_pulse(&dht2, 100); ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1
// Test blind find_node from dht3 to dht1 to find dht2 nodeid
diag("Find_Node test\n");
ks_dht_pulse(dht1, 100); ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL);
diag("Cleanup\n"); diag("Cleanup\n");
/* Cleanup and shutdown */ /* Cleanup and shutdown */
err = ks_dht_deinit(dht3);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_free(&dht3);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_deinit(&dht2); err = ks_dht_deinit(&dht2);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论