提交 b88437fc authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Some cleanup, some commenting, some fixes.

上级 9e9adb8e
......@@ -6,21 +6,98 @@
KS_BEGIN_EXTERN_C
/**
*
* Determines the appropriate endpoint to reach a remote address.
* If an endpoint is provided, nothing more needs to be done.
* If no endpoint is provided, first it will check for an active endpoint it can route though.
* If no active endpoint is available and autorouting is enabled it will attempt to bind a usable endpoint.
* @param dht pointer to the dht instance
* @param raddr pointer to the remote address
* @param endpoint dereferenced in/out pointer to the endpoint, if populated then returns immediately
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_ip_route
* @see ks_hash_read_unlock
* @see ks_addr_set
* @see ks_dht_bind
*/
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
/**
* Called internally to expire various data.
* Handles purging of expired and finished transactions, rotating token secrets, etc.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
/**
* Called internally to send queued messages.
* Handles throttling of message sending to ensure system buffers are not overloaded and messages are not dropped.
* @param dht pointer to the dht instance
*/
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
/**
* Converts a ks_dht_nodeid_t into it's hex string representation.
* @param id pointer to the nodeid
* @param buffer pointer to the buffer able to contain at least (KS_DHT_NODEID_SIZE * 2) + 1 characters
* @return The pointer to the front of the populated string buffer
*/
KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer);
/**
* Compacts address information as per the DHT specifications.
* @param address pointer to the address being compacted from
* @param buffer pointer to the buffer containing compacted data
* @param buffer_length pointer to the buffer length consumed
* @param buffer_size max size of the buffer
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
*/
KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size);
/**
* Expands address information as per the DHT specifications.
* @param buffer pointer to the buffer containing compacted data
* @param buffer_length pointer to the buffer length consumed
* @param buffer_size max size of the buffer
* @param address pointer to the address being expanded into
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ...
* @see ks_addr_set_raw
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
ks_sockaddr_t *address);
/**
* Compacts node information as per the DHT specifications.
* Compacts address information after the nodeid.
* @param nodeid pointer to the nodeid being compacted from
* @param address pointer to the address being compacted from
* @param buffer pointer to the buffer containing compacted data
* @param buffer_length pointer to the buffer length consumed
* @param buffer_size max size of the buffer
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ...
* @see ks_dht_utility_compact_addressinfo
*/
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *address,
uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size);
/**
* Expands address information as per the DHT specifications.
* Expands compacted address information after the nodeid.
* @param buffer pointer to the buffer containing compacted data
* @param buffer_length pointer to the buffer length consumed
* @param buffer_size max size of the buffer
* @param address pointer to the address being expanded into
* @param nodeid pointer to the nodeid being expanded into
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ...
* @see ks_dht_utility_expand_addressinfo
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
......@@ -28,12 +105,107 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_sockaddr_t *address);
/**
*
* Extracts a ks_dht_nodeid_t from a bencode dictionary given a string key.
* @param args pointer to the bencode dictionary
* @param key string key in the bencode dictionary to extract the value from
* @param nodeid dereferenced out pointer to the nodeid
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_ARG_INVALID
*/
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, const char *key, ks_dht_nodeid_t **nodeid);
/**
* Extracts a ks_dht_token_t from a bencode dictionary given a string key.
* @param args pointer to the bencode dictionary
* @param key string key in the bencode dictionary to extract the value from
* @param nodeid dereferenced out pointer to the token
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_ARG_INVALID
*/
KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const char *key, ks_dht_token_t **token);
/**
* Generates an opaque write token based on a shifting secret value, the remote address and target nodeid of interest.
* This token ensures that future operations can be verified to the remote peer and target id requested.
* @param secret rotating secret portion of the token hash
* @param raddr pointer to the remote address used for the ip and port in the token hash
* @param target pointer to the nodeid of the target used for the token hash
* @param token pointer to the output token being generated
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
/**
* Verify an opaque write token matches the provided remote address and target nodeid.
* Handles checking against the last two secret values for the token hash.
* @param dht pointer to the dht instance
* @param raddr pointer to the remote address used for the ip and port in the token hash
* @param target pointer to the nodeid of the target used for the token hash
* @param token pointer to the input token being compared
* @return Either KS_TRUE if verification passes, otherwise KS_FALSE
*/
KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
/**
* Encodes a message for transmission as a UDP datagram and sends it.
* Uses the internally tracked local endpoint and remote address to route the UDP datagram.
* @param dht pointer to the dht instance
* @param message pointer to the message being sent
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_socket_sendto
*/
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
/**
* Sets up the common parts of a query message.
* Determines the local endpoint aware of autorouting, assigns the remote address, generates a transaction, and queues a callback.
* @param dht pointer to the dht instance
* @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one
* @param raddr pointer to the remote address
* @param query string value of the query type, for example "ping"
* @param callback callback to be called when response to transaction is received
* @param message dereferenced out pointer to the allocated message
* @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
* @see ks_dht_autoroute_check
* @see ks_dht_transaction_alloc
* @see ks_dht_transaction_init
* @see ks_dht_message_alloc
* @see ks_dht_message_init
* @see ks_dht_message_query
* @see ks_hash_insert
*/
KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
const char *query,
ks_dht_message_callback_t callback,
ks_dht_message_t **message,
struct bencode **args);
/**
* Sets up the common parts of a response message.
* Determines the local endpoint aware of autorouting, assigns the remote address, and assigns the transaction.
* @param dht pointer to the dht instance
* @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one
* @param raddr pointer to the remote address
* @param transactionid pointer to the buffer containing the transactionid, may be of variable size depending on the querying node
* @param transactionid_length length of the transactionid buffer
* @param message dereferenced out pointer to the allocated message
* @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_dht_autoroute_check
* @see ks_dht_message_alloc
* @see ks_dht_message_init
* @see ks_dht_message_response
*/
KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
ks_dht_message_t **message,
struct bencode **args);
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
......
......@@ -2,250 +2,426 @@
#include "ks_dht-int.h"
#include "sodium.h"
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool)
{
ks_bool_t pool_alloc = !pool;
ks_dht_t *d;
ks_assert(dht);
/**
* Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created.
*/
if (pool_alloc) ks_pool_open(&pool);
if (!pool) return KS_STATUS_NO_MEM;
/**
* Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created.
*/
*dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t));
if (!d) return KS_STATUS_NO_MEM;
/**
* Keep track of the pool used for future allocations and cleanup.
* Keep track of whether the pool was created internally or not.
*/
d->pool = pool;
d->pool_alloc = pool_alloc;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool)
KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool)
{
ks_assert(dht);
ks_assert(pool);
/**
* Treat preallocate function like allocate, zero the memory like pool allocations do.
*/
memset(dht, 0, sizeof(ks_dht_t));
/**
* Keep track of the pool used for future allocations, pool must
*/
dht->pool = pool;
dht->pool_alloc = KS_FALSE;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht)
{
ks_pool_t *pool;
ks_bool_t pool_alloc;
ks_pool_t *pool = NULL;
ks_bool_t pool_alloc = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(*dht);
/**
* Call ks_dht_deinit to ensure everything has been cleaned up internally.
* The pool member variables must not be messed with in deinit, they are managed at the allocator layer.
*/
if ((ret = ks_dht_deinit(*dht)) != KS_STATUS_SUCCESS) return ret;
/**
* Temporarily store the allocator level variables because freeing the dht instance will invalidate it.
*/
pool = (*dht)->pool;
pool_alloc = (*dht)->pool_alloc;
ks_dht_deinit(*dht);
ks_pool_free(pool, *dht);
if (pool_alloc) {
ks_pool_close(&pool);
}
/**
* Free the dht instance from the pool, after this the dht instance memory is invalid.
*/
if ((ret = ks_pool_free((*dht)->pool, *dht)) != KS_STATUS_SUCCESS) return ret;
/**
* At this point dht instance is invalidated so NULL the pointer.
*/
*dht = NULL;
/**
* If the pool was allocated internally, destroy it using the temporary variables stored earlier.
* If this fails, something catastrophically bad happened like memory corruption.
*/
if (pool_alloc && (ret = ks_pool_close(&pool)) != KS_STATUS_SUCCESS) return ret;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(dht->pool);
/**
* Default autorouting to disabled.
*/
dht->autoroute = KS_FALSE;
dht->autoroute_port = 0;
ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
/**
* Create the message type registry.
*/
if ((ret = ks_hash_create(&dht->registry_type,
KS_HASH_MODE_DEFAULT,
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
/**
* Register the message type callbacks for query (q), response (r), and error (e)
*/
ks_dht_register_type(dht, "q", ks_dht_process_query);
ks_dht_register_type(dht, "r", ks_dht_process_response);
ks_dht_register_type(dht, "e", ks_dht_process_error);
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
/**
* Create the message query registry.
*/
if ((ret = ks_hash_create(&dht->registry_query,
KS_HASH_MODE_DEFAULT,
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
/**
* Register the message query callbacks for ping, find_node, etc.
*/
ks_dht_register_query(dht, "ping", ks_dht_process_query_ping);
ks_dht_register_query(dht, "find_node", ks_dht_process_query_findnode);
ks_dht_register_query(dht, "get", ks_dht_process_query_get);
ks_dht_register_query(dht, "put", ks_dht_process_query_put);
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
/**
* Create the message error registry.
*/
if ((ret = ks_hash_create(&dht->registry_error,
KS_HASH_MODE_DEFAULT,
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
/**
* Default these to FALSE, binding will set them TRUE when a respective address is bound.
* @todo these may not be useful anymore they are from legacy code
*/
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
/**
* Initialize the data used to track endpoints to NULL, binding will handle latent allocations.
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
*/
dht->endpoints = NULL;
dht->endpoints_size = 0;
ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool);
dht->endpoints_poll = NULL;
/**
* Create the endpoints hash for fast lookup, this is used to route externally provided remote addresses when the local endpoint is unknown.
* This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
* This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
*/
if ((ret = ks_hash_create(&dht->endpoints_hash,
KS_HASH_MODE_DEFAULT,
KS_HASH_FLAG_RWLOCK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
/**
* Default expirations to not be checked for one pulse.
*/
dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
ks_q_create(&dht->send_q, dht->pool, 0);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
*/
if ((ret = ks_q_create(&dht->send_q, dht->pool, 0)) != KS_STATUS_SUCCESS) return ret;
/**
* If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message.
*/
dht->send_q_unsent = NULL;
/**
* The dht uses a single internal large receive buffer for receiving all frames, this may change in the future to offload processing to a threadpool.
*/
dht->recv_buffer_length = 0;
/**
* Initialize the first transaction id randomly, this doesn't really matter.
*/
dht->transactionid_next = 1; //rand();
ks_hash_create(&dht->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, dht->pool);
/**
* Create the hash to track pending transactions on queries that are pending responses.
* It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
*/
if ((ret = ks_hash_create(&dht->transactions_hash,
KS_HASH_MODE_INT,
KS_HASH_FLAG_RWLOCK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
/**
* The internal route tables will be latent allocated when binding.
*/
dht->rt_ipv4 = NULL;
dht->rt_ipv6 = NULL;
/**
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
dht->token_secret_current = dht->token_secret_previous = rand();
dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
/**
* Create the hash to store arbitrary data for BEP44.
*/
if ((ret = ks_hash_create(&dht->storage_hash,
KS_HASH_MODE_ARBITRARY,
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
dht->pool)) != KS_STATUS_SUCCESS) return ret;
/**
* The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
*/
ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
{
ks_hash_iterator_t *it;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
/**
* Cleanup the storage hash and it's contents if it is allocated.
*/
if (dht->storage_hash) {
for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key;
ks_dht_storageitem_t *val;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_storageitem_deinit(val);
ks_dht_storageitem_free(&val);
if ((ret = ks_dht_storageitem_deinit(val)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_storageitem_free(&val)) != KS_STATUS_SUCCESS) return ret;
}
ks_hash_destroy(&dht->storage_hash);
}
/**
* Zero out the opaque write token variables.
*/
dht->token_secret_current = 0;
dht->token_secret_previous = 0;
dht->token_secret_expiration = 0;
/**
* Cleanup the route tables if they are allocated.
*/
if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6);
/**
* Cleanup the transactions hash if it is allocated.
*/
dht->transactionid_next = 0;
if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash);
/**
* Probably don't need this, recv_buffer_length is temporary and may change
*/
dht->recv_buffer_length = 0;
/**
* Cleanup the send queue and it's contents if it is allocated.
*/
if (dht->send_q) {
ks_dht_message_t *msg;
while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) {
ks_dht_message_deinit(msg);
ks_dht_message_free(&msg);
if ((ret = ks_dht_message_deinit(msg)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_message_free(&msg)) != KS_STATUS_SUCCESS) return ret;
}
ks_q_destroy(&dht->send_q);
if ((ret = ks_q_destroy(&dht->send_q)) != KS_STATUS_SUCCESS) return ret;
}
/**
* Cleanup the cached popped message if it is set.
*/
if (dht->send_q_unsent) {
ks_dht_message_deinit(dht->send_q_unsent);
ks_dht_message_free(&dht->send_q_unsent);
if ((ret = ks_dht_message_deinit(dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_message_free(&dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret;
}
/**
* Probably don't need this
*/
dht->pulse_expirations = 0;
/**
* Cleanup any endpoints that have been allocated.
*/
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht_endpoint_t *ep = dht->endpoints[i];
ks_dht_endpoint_deinit(ep);
ks_dht_endpoint_free(&ep);
if ((ret = ks_dht_endpoint_deinit(ep)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_endpoint_free(&ep)) != KS_STATUS_SUCCESS) return ret;
}
dht->endpoints_size = 0;
/**
* Cleanup the array of endpoint pointers if it is allocated.
*/
if (dht->endpoints) {
ks_pool_free(dht->pool, dht->endpoints);
if ((ret = ks_pool_free(dht->pool, dht->endpoints)) != KS_STATUS_SUCCESS) return ret;
dht->endpoints = NULL;
}
/**
* Cleanup the array of endpoint polling data if it is allocated.
*/
if (dht->endpoints_poll) {
ks_pool_free(dht->pool, dht->endpoints_poll);
if ((ret = ks_pool_free(dht->pool, dht->endpoints_poll)) != KS_STATUS_SUCCESS) return ret;
dht->endpoints_poll = NULL;
}
/**
* Cleanup the endpoints hash if it is allocated.
*/
if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash);
/**
* Probably don't need this
*/
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
/**
* Cleanup the type, query, and error registries if they have been allocated.
*/
if (dht->registry_type) ks_hash_destroy(&dht->registry_type);
if (dht->registry_query) ks_hash_destroy(&dht->registry_query);
if (dht->registry_error) ks_hash_destroy(&dht->registry_error);
/**
* Probably don't need this
*/
dht->autoroute = KS_FALSE;
dht->autoroute_port = 0;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port)
KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(dht);
/**
* If autorouting is being disabled, port is always set to zero, otherwise if the port is zero use the DHT default port
*/
if (!autoroute) port = 0;
else if (port <= 0) port = KS_DHT_DEFAULT_PORT;
/**
* Set the autoroute state
*/
dht->autoroute = autoroute;
dht->autoroute_port = port;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
{
// @todo lookup standard def for IPV6 max size
char ip[48];
ks_dht_endpoint_t *ep = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(endpoint);
*endpoint = NULL;
/**
* If the endpoint is already provided just leave it alone and return successfully.
*/
if (*endpoint) return KS_STATUS_SUCCESS;
ks_ip_route(ip, sizeof(ip), raddr->host);
/**
* Use the remote address to figure out what local address we should use to attempt contacting it.
*/
if ((ret = ks_ip_route(ip, sizeof(ip), raddr->host)) != KS_STATUS_SUCCESS) return ret;
/**
* Check if the endpoint has already been bound for the address we want to route through.
* @todo ip:port for key to allow a single ip with multiple endpoints on different ports
*/
ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
ks_hash_read_unlock(dht->endpoints_hash);
if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
/**
* If the endpoint has not been bound, and autorouting is enabled then try to bind the new address.
*/
if (!ep && dht->autoroute) {
ks_sockaddr_t addr;
ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if ((ret = ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_bind(dht, NULL, &addr, &ep)) != KS_STATUS_SUCCESS) return ret;
}
/**
* If no endpoint can be found to route through then all hope is lost, bail out with a failure.
*/
if (!ep) {
ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
return KS_STATUS_FAIL;
}
/**
* Reaching here means an endpoint is available, assign it and return successfully.
*/
*endpoint = ep;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
......@@ -255,9 +431,6 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
......@@ -267,9 +440,6 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
......@@ -279,77 +449,128 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
ks_dht_endpoint_t *ep;
ks_socket_t sock;
int32_t epindex;
ks_dht_endpoint_t *ep = NULL;
ks_socket_t sock = KS_SOCK_INVALID;
int32_t epindex = 0;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
ks_assert(addr->port);
/**
* If capturing the endpoint output, make sure it is set NULL to start with.
*/
if (endpoint) *endpoint = NULL;
/**
* Legacy code, this can probably go away
*/
dht->bind_ipv4 |= addr->family == AF_INET;
dht->bind_ipv6 |= addr->family == AF_INET6;
/**
* Attempt to open a UDP datagram socket for the given address family.
*/
if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
/**
* Set some common socket options for non-blocking IO and forced binding when already in use
*/
if ((ret = ks_socket_option(sock, SO_REUSEADDR, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_socket_option(sock, KS_SO_NONBLOCK, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
/**
* Attempt to bind the socket to the desired local address.
*/
// @todo shouldn't ks_addr_bind take a const addr *?
if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock);
return KS_STATUS_FAIL;
}
if ((ret = ks_addr_bind(sock, (ks_sockaddr_t *)addr)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock);
return KS_STATUS_FAIL;
}
if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) {
ks_dht_endpoint_free(&ep);
ks_socket_close(&sock);
return KS_STATUS_FAIL;
}
ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
/**
* Allocate the endpoint to track the local socket.
*/
if ((ret = ks_dht_endpoint_alloc(&ep, dht->pool)) != KS_STATUS_SUCCESS) goto done;
/**
* Initialize the node, may provide NULL nodeid to have one generated internally.
*/
if ((ret = ks_dht_endpoint_init(ep, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done;
/**
* Resize the endpoints array to take another endpoint pointer.
*/
epindex = dht->endpoints_size++;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
dht->endpoints[epindex] = ep;
ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
/**
* Add the new endpoint into the endpoints hash for quick lookups.
* @todo ip:port for key to allow a single ip with multiple endpoints on different ports
*/
if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) {
ret = KS_STATUS_FAIL;
goto done;
}
/**
* Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
*/
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size);
dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
/**
* If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
*/
if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
ep->nodeid,
ks_dht_local_t,
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
} else {
if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->nodeid,
ks_dht_local_t,
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
}
/**
* If the endpoint output is being captured, assign it and return successfully.
*/
if (endpoint) *endpoint = ep;
return KS_STATUS_SUCCESS;
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS) {
/**
* If any failures occur, we need to make sure the socket is properly closed.
* This will be done in ks_dht_endpoint_deinit only if the socket was assigned during a successful ks_dht_endpoint_init.
* Then return whatever failure condition resulted in landed here.
*/
if (sock != KS_SOCK_INVALID && ep && ep->sock == KS_SOCK_INVALID) ks_socket_close(&sock);
if (ep) {
ks_dht_endpoint_deinit(ep);
ks_dht_endpoint_free(&ep);
}
}
return ret;
}
/**
*
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
int32_t result;
......@@ -387,9 +608,6 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
}
/**
*
*/
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
......@@ -427,9 +645,6 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
}
}
/**
*
*/
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
{
ks_dht_message_t *message;
......@@ -456,10 +671,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
}
}
/**
*
*/
static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
{
char *t = buffer;
......@@ -473,9 +685,6 @@ static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
return buffer;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
uint8_t *buffer,
ks_size_t *buffer_length,
......@@ -495,7 +704,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_FAIL;
return KS_STATUS_NO_MEM;
}
if (address->family == AF_INET) {
......@@ -514,9 +723,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
......@@ -532,7 +738,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL;
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM;
paddr = buffer + *buffer_length;
*buffer_length += addr_len;
......@@ -540,14 +746,9 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
*buffer_length += sizeof(uint16_t);
// @todo ks_addr_set_raw second parameter should be const?
ks_addr_set_raw(address, (void *)paddr, port, address->family);
return KS_STATUS_SUCCESS;
return ks_addr_set_raw(address, (void *)paddr, port, address->family);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *address,
uint8_t *buffer,
......@@ -562,7 +763,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_FAIL;
return KS_STATUS_NO_MEM;
}
memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
......@@ -571,9 +772,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n
return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_size_t *buffer_length,
ks_size_t buffer_size,
......@@ -586,7 +784,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL;
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
*buffer_length += KS_DHT_NODEID_SIZE;
......@@ -594,9 +792,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, const char *key, ks_dht_nodeid_t **nodeid)
{
struct bencode *id;
......@@ -612,14 +807,14 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons
id = ben_dict_get_by_str(args, key);
if (!id) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
return KS_STATUS_FAIL;
return KS_STATUS_ARG_INVALID;
}
idv = ben_str_val(id);
idv_len = ben_str_len(id);
if (idv_len != KS_DHT_NODEID_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, idv_len);
return KS_STATUS_FAIL;
return KS_STATUS_ARG_INVALID;
}
*nodeid = (ks_dht_nodeid_t *)idv;
......@@ -627,9 +822,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const char *key, ks_dht_token_t **token)
{
struct bencode *tok;
......@@ -645,14 +837,14 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
tok = ben_dict_get_by_str(args, key);
if (!tok) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
return KS_STATUS_FAIL;
return KS_STATUS_ARG_INVALID;
}
tokv = ben_str_val(tok);
tokv_len = ben_str_len(tok);
if (tokv_len != KS_DHT_TOKEN_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, tokv_len);
return KS_STATUS_FAIL;
return KS_STATUS_ARG_INVALID;
}
*token = (ks_dht_token_t *)tokv;
......@@ -661,9 +853,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
SHA_CTX sha;
......@@ -677,35 +866,30 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
secret = htonl(secret);
port = htons(raddr->port);
SHA1_Init(&sha);
SHA1_Update(&sha, &secret, sizeof(uint32_t));
SHA1_Update(&sha, raddr->host, strlen(raddr->host));
SHA1_Update(&sha, &port, sizeof(uint16_t));
SHA1_Update(&sha, target->id, KS_DHT_NODEID_SIZE);
SHA1_Final(token->token, &sha);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, &secret, sizeof(uint32_t)) ||
!SHA1_Update(&sha, raddr->host, strlen(raddr->host)) ||
!SHA1_Update(&sha, &port, sizeof(uint16_t)) ||
!SHA1_Update(&sha, target->id, KS_DHT_NODEID_SIZE) ||
!SHA1_Final(token->token, &sha)) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
ks_dht_token_t tok;
ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok);
if (ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok) != KS_STATUS_SUCCESS) return KS_FALSE;
if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE;
if (memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0) return KS_TRUE;
ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok);
if (ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok) != KS_STATUS_SUCCESS) return KS_FALSE;
return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
// @todo calculate max IPV6 payload size?
......@@ -727,9 +911,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
......@@ -751,24 +933,27 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
*message = NULL;
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
// @todo atomic increment or mutex
transactionid = dht->transactionid_next++;
if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_transaction_alloc(&trans, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_transaction_init(trans, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
*message = msg;
ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans);
if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) {
ret = KS_STATUS_FAIL;
goto done;
}
ret = KS_STATUS_SUCCESS;
......@@ -787,9 +972,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
......@@ -808,13 +990,13 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
*message = NULL;
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
*message = msg;
......@@ -829,9 +1011,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
{
ks_dht_message_t message;
......@@ -867,9 +1047,6 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message)
{
struct bencode *q;
......@@ -919,9 +1096,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message)
{
struct bencode *r;
......@@ -966,10 +1140,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback)
{
ks_assert(dht);
......@@ -1007,10 +1177,6 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
......@@ -1052,9 +1218,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message)
{
struct bencode *e;
......@@ -1127,9 +1290,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
{
ks_dht_message_t *message = NULL;
......@@ -1148,9 +1308,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1192,9 +1349,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1218,9 +1372,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
......@@ -1241,9 +1392,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1356,9 +1504,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1438,9 +1583,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
ks_dht_message_t *message = NULL;
......@@ -1462,9 +1604,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1540,9 +1679,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1574,9 +1710,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
}
/**
*
*/
// @todo ks_dht_send_put
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......@@ -1617,9 +1752,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_nodeid_t *id;
......
......@@ -174,45 +174,176 @@ struct ks_dht_s {
};
/**
*
* Allocator function for ks_dht_t.
* Should be used when a ks_dht_t is allocated on the heap, and may provide an external memory pool or allocate one internally.
* @param dht dereferenced out pointer to the allocated dht instance
* @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new pool internally
* @param The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
*/
KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht);
/**
* Preallocator function for ks_dht_t.
* Should be used when a ks_dht_t is preallocated on the stack or within another structure, and must provide an external memory pool.
* @param dht pointer to the dht instance
* @param pool pointer to the memory pool used by the dht instance
*/
KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool);
/**
* Deallocator function for ks_dht_t.
* Must be used when a ks_dht_t is allocated using ks_dht_alloc, will also destroy memory pool if it was created internally.
* @param dht dereferenced in/out pointer to the dht instance, NULL upon return
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_dht_deinit
* @see ks_pool_free
* @see ks_pool_close
*/
KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht);
/**
* Constructor function for ks_dht_t.
* Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers.
* @param dht pointer to the dht instance
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_hash_create
* @see ks_dht_register_type
* @see ks_q_create
*/
KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht);
/**
* Destructor function for ks_dht_t.
* Must be used regardless of how ks_dht_t is allocated, will deallocate and deinitialize internal state.
* @param dht pointer to the dht instance
* @return The ks_status_t result: KS_STATUS_SUCCESS, ...
* @see ks_dht_storageitem_deinit
* @see ks_dht_storageitem_free
* @see ks_hash_destroy
* @see ks_dht_message_deinit
* @see ks_dht_message_free
* @see ks_q_destroy
* @see ks_dht_endpoint_deinit
* @see ks_dht_endpoint_free
* @see ks_pool_free
*/
KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port);
/**
* Enable or disable (default) autorouting support.
* When enabled, autorouting will allow sending to remote addresses on interfaces which are not yet bound.
* The address will be bound with the provided autoroute port when this occurs.
* @param dht pointer to the dht instance
* @param autoroute enable or disable autorouting
* @param port when enabling autorouting this port will be used to bind new addresses, may be 0 to use the default DHT port
*/
KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port);
/**
* Register a callback for a specific message type.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'y' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message query.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'q' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message error.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the errorcode under the first item of the 'e' key of a message
* @param callback the callback to be called when a message matches
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Bind a local address and port for receiving UDP datagrams.
* @param dht pointer to the dht instance
* @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
* @param addr pointer to the remote address information
* @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
* @see ks_socket_option
* @see ks_addr_bind
* @see ks_dht_endpoint_alloc
* @see ks_dht_endpoint_init
* @see ks_hash_insert
* @see ks_dhtrt_initroute
* @see ks_dhtrt_create_node
*/
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
/**
* Pulse the internals of dht.
* Handles receiving UDP datagrams, dispatching processing, handles expirations, throttled message sending, route table pulsing, etc.
* @param dht pointer to the dht instance
* @param timeout timeout value used when polling sockets for new UDP datagrams
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
uint32_t transactionid,
const char *query,
struct bencode **args);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
......@@ -221,21 +352,30 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction);
KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction,
ks_sockaddr_t *raddr,
uint32_t transactionid,
ks_dht_message_callback_t callback);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction);
/**
* route table methods
*
......@@ -244,10 +384,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type,
char* ip, unsigned short port,
ks_dht_node_t** node);
ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type,
char* ip, unsigned short port,
ks_dht_node_t** node);
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node);
......
......@@ -59,8 +59,7 @@ int main() {
err = ks_dht_init(dht1);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_prealloc(&dht2, dht1->pool);
ok(err == KS_STATUS_SUCCESS);
ks_dht_prealloc(&dht2, dht1->pool);
err = ks_dht_init(&dht2);
ok(err == KS_STATUS_SUCCESS);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论