提交 5dfd6d1b authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9775: Bug fixes and exposed interface changes while implementing tests for…

FS-9775: Bug fixes and exposed interface changes while implementing tests for get/put which are functional and pass initial tests now. Deep searching needs to be revamped now to complete the full announcing process.
上级 4970c7e9
......@@ -41,7 +41,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
* @param buffer pointer to the buffer able to contain at least (KS_DHT_NODEID_SIZE * 2) + 1 characters
* @return The pointer to the front of the populated string buffer
*/
KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer);
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
/**
* Compacts address information as per the DHT specifications.
......@@ -270,9 +270,9 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length);
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item);
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
......@@ -286,6 +286,32 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_socket_t sock);
KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr,
ks_bool_t alloc_data);
/**
*
*/
KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/**
*
......@@ -299,19 +325,50 @@ KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pendin
KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_dht_storageitem_key_t *k,
ks_bool_t clone_v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
const uint8_t *value,
ks_size_t value_length);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_bool_t clone_v,
ks_dht_storageitem_pkey_t *pk,
struct bencode *salt,
ks_bool_t clone_salt,
int64_t sequence,
ks_dht_storageitem_signature_t *signature);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
const uint8_t *value,
ks_size_t value_length,
ks_dht_storageitem_pkey_t *pk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
ks_dht_storageitem_signature_t *signature);
KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature);
KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
/**
......
......@@ -211,10 +211,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
*/
if (d->storageitems_hash) {
for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_storageitem_t *val;
ks_hash_this_val(it, (void **)&val);
const void *key = NULL;
ks_dht_storageitem_t *val = NULL;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_storageitem_destroy(&val);
}
ks_hash_destroy(&d->storageitems_hash);
......@@ -232,18 +231,18 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
*/
if (d->searches6_hash) {
for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_search_t *val;
ks_hash_this_val(it, (void **)&val);
const void *key = NULL;
ks_dht_search_t *val = NULL;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_search_destroy(&val);
}
ks_hash_destroy(&d->searches6_hash);
}
if (d->searches4_hash) {
for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_search_t *val;
ks_hash_this_val(it, (void **)&val);
const void *key = NULL;
ks_dht_search_t *val = NULL;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_search_destroy(&val);
}
ks_hash_destroy(&d->searches4_hash);
......@@ -652,8 +651,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *sea
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_log(KS_LOG_DEBUG,
"Search for %s pending find_node to %s has expired without response\n",
ks_dht_hexid(&value->target, id_buf),
ks_dht_hexid(&v->nodeid, id2_buf));
ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE));
v->finished = KS_TRUE;
continue;
}
......@@ -691,7 +690,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
else if (value->expiration <= now) {
// if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
......@@ -736,16 +735,16 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
}
}
KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
{
char *t = buffer;
ks_assert(id);
ks_assert(data);
ks_assert(buffer);
memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
memset(buffer, 0, len * 2 + 1);
for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
for (int i = 0; i < len; ++i, t += 2) sprintf(t, "%02X", data[i]);
return buffer;
}
......@@ -933,10 +932,10 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_pkey(struct bencode *args,
ks_bool_t optional,
const char *key,
ks_dht_storageitem_key_t **sikey)
ks_dht_storageitem_pkey_t **pkey)
{
struct bencode *k;
const char *kv;
......@@ -945,9 +944,9 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a
ks_assert(args);
ks_assert(key);
ks_assert(sikey);
ks_assert(pkey);
*sikey = NULL;
*pkey = NULL;
k = ben_dict_get_by_str(args, key);
if (!k) {
......@@ -960,12 +959,12 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a
kv = ben_str_val(k);
kv_len = ben_str_len(k);
if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
if (kv_len != KS_DHT_STORAGEITEM_PKEY_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
return KS_STATUS_ARG_INVALID;
}
*sikey = (ks_dht_storageitem_key_t *)kv;
*pkey = (ks_dht_storageitem_pkey_t *)kv;
done:
return ret;
......@@ -1046,35 +1045,54 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra
return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
const uint8_t *v;
uint8_t *v = NULL;
size_t v_len;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(value);
ks_assert(target);
v = (const uint8_t *)ben_str_val(value);
v_len = ben_str_len(value);
v = ben_encode(&v_len, value);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, v, v_len) ||
!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
!SHA1_Final(target->id, &sha)) {
ret = KS_STATUS_FAIL;
}
free(v);
return KS_STATUS_SUCCESS;
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target)
{
struct bencode *v = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(value);
ks_assert(value_length > 0);
ks_assert(target);
v = ben_blob(value, value_length);
ret = ks_dht_storageitem_target_immutable_internal(v, target);
ben_free(v);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
//char buf1[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(k);
ks_assert(pk);
ks_assert(target);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
!SHA1_Update(&sha, pk->key, KS_DHT_STORAGEITEM_PKEY_SIZE)) return KS_STATUS_FAIL;
if (salt) {
const uint8_t *s = (const uint8_t *)ben_str_val(salt);
size_t s_len = ben_str_len(salt);
......@@ -1082,9 +1100,156 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key
}
if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
//ks_log(KS_LOG_DEBUG, "Mutable ID: %s\n", ks_dht_hex(target->id, buf1, KS_DHT_NODEID_SIZE));
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target)
{
struct bencode *s = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(pk);
ks_assert(target);
if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
ret = ks_dht_storageitem_target_mutable_internal(pk, s, target);
if (s) ben_free(s);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_encode(uint8_t **encoded,
ks_size_t *encoded_length,
struct bencode *salt,
struct bencode *seq,
struct bencode *v)
{
char *enc = NULL;
char *salt_enc = NULL;
size_t salt_enc_length = 0;
char *seq_enc = NULL;
size_t seq_enc_length = 0;
char *v_enc = NULL;
size_t v_enc_length = 0;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(encoded);
ks_assert(encoded_length);
ks_assert(seq);
ks_assert(v);
if (salt) salt_enc = ben_encode(&salt_enc_length, salt);
seq_enc = ben_encode(&seq_enc_length, seq);
v_enc = ben_encode(&v_enc_length, v);
*encoded_length = (salt ? 6 : 0) + // 4:salt
salt_enc_length +
5 + // 3:seq
seq_enc_length +
3 + // 1:v
v_enc_length;
enc = malloc((*encoded_length) + 1);
*encoded = (uint8_t *)enc;
enc[0] = '\0';
if (salt) {
strncat(enc, "4:salt", 6);
strncat(enc, salt_enc, salt_enc_length);
}
strncat(enc, "3:seq", 5);
strncat(enc, seq_enc, seq_enc_length);
strncat(enc, "1:v", 3);
strncat(enc, v_enc, v_enc_length);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate_internal(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_skey_t *sk,
struct bencode *salt,
struct bencode *seq,
struct bencode *v)
{
uint8_t *tmpsig = NULL;
size_t tmpsig_len = 0;
ks_status_t ret = KS_STATUS_SUCCESS;
//char buf1[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
ks_assert(sig);
ks_assert(sk);
ks_assert(seq);
ks_assert(v);
if ((ret = ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v)) != KS_STATUS_SUCCESS) goto done;
if (crypto_sign_detached(sig->sig, NULL, tmpsig, tmpsig_len, sk->key) != 0) {
ret = KS_STATUS_FAIL;
goto done;
}
//ks_log(KS_LOG_DEBUG, "Signed: %s\n", ks_dht_hex(sig->sig, buf1, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
done:
if (tmpsig) free(tmpsig);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_skey_t *sk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
const uint8_t *value,
ks_size_t value_length)
{
struct bencode *s = NULL;
struct bencode *seq = NULL;
struct bencode *v = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(sig);
ks_assert(sk);
ks_assert(sequence > 0);
ks_assert(value);
ks_assert(value_length > 0);
if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
seq = ben_int(sequence);
v = ben_blob(value, value_length);
ret = ks_dht_storageitem_signature_generate_internal(sig, sk, s, seq, v);
if (s) ben_free(s);
ben_free(seq);
ben_free(v);
return ret;
}
KS_DECLARE(ks_bool_t) ks_dht_storageitem_signature_verify(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_pkey_t *pk,
struct bencode *salt,
struct bencode *seq,
struct bencode *v)
{
uint8_t *tmpsig = NULL;
size_t tmpsig_len = 0;
int32_t res = 0;
ks_assert(sig);
ks_assert(pk);
ks_assert(seq);
ks_assert(v);
if (ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v) != KS_STATUS_SUCCESS) return KS_FALSE;
res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, pk->key);
if (tmpsig) free(tmpsig);
return res == 0;
}
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1];
......@@ -1288,7 +1453,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
memcpy(query, qv, qv_len);
query[qv_len] = '\0';
ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
//ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
a = ben_dict_get_by_str(message->data, "a");
if (!a) {
......@@ -1302,7 +1467,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
*id,
KS_DHT_REMOTE,
......@@ -1347,7 +1512,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
*id,
KS_DHT_REMOTE,
......@@ -1356,7 +1521,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
&node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
......@@ -1377,9 +1542,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
transaction->job->raddr.port);
} else {
transaction->job->response = message;
transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
message->transaction = transaction;
if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
transaction->job->response = NULL; // message is destroyed after we return, stop using it
transaction->finished = KS_TRUE;
}
......@@ -1502,6 +1668,46 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
return ret;
}
KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht)
{
ks_assert(dht);
ks_hash_read_lock(dht->storageitems_hash);
}
KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht)
{
ks_assert(dht);
ks_hash_read_unlock(dht->storageitems_hash);
}
KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht)
{
ks_assert(dht);
ks_hash_write_lock(dht->storageitems_hash);
}
KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht)
{
ks_assert(dht);
ks_hash_write_lock(dht->storageitems_hash);
}
KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target)
{
ks_assert(dht);
ks_assert(target);
return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
}
KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item)
{
ks_assert(dht);
ks_assert(item);
return ks_hash_insert(dht->storageitems_hash, item->id.id, item);
}
KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
......@@ -1632,45 +1838,45 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
{
ks_dht_job_t *first = NULL;
ks_dht_job_t *last = NULL;
ks_assert(dht);
ks_mutex_lock(dht->jobs_mutex);
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
ks_bool_t remove = KS_FALSE;
ks_bool_t done = KS_FALSE;
jobn = job->next;
switch (job->state) {
case KS_DHT_JOB_STATE_QUERYING:
if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
break;
case KS_DHT_JOB_STATE_RESPONDING:
break;
case KS_DHT_JOB_STATE_EXPIRING:
}
if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
job->attempts--;
if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
else {
if (job->finish_callback) job->finish_callback(dht, job);
remove = KS_TRUE;
}
break;
case KS_DHT_JOB_STATE_PROCESSING:
break;
case KS_DHT_JOB_STATE_COMPLETING:
if (job->finish_callback) job->finish_callback(dht, job);
remove = KS_TRUE;
break;
default: break;
else done = KS_TRUE;
}
if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
if (remove) {
if (done) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
else if (!jobn) dht->jobs_last = jobp;
else jobp->next = jobn;
ks_dht_job_destroy(&job);
job->next = NULL;
if (last) last = last->next = job;
else first = last = job;
} else jobp = job;
}
ks_mutex_unlock(dht->jobs_mutex);
for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
jobn = job->next;
// this cannot occur inside of the main loop, may add new jobs invalidating list pointers
if (job->finish_callback) job->finish_callback(dht, job);
ks_dht_job_destroy(&job);
}
}
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
......@@ -1681,6 +1887,8 @@ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, k
ks_assert(dht);
ks_assert(raddr);
//ks_log(KS_LOG_DEBUG, "Starting ping!\n");
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
ks_dht_jobs_add(dht, job);
......@@ -1707,7 +1915,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
&message,
NULL)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
//ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
done:
......@@ -1723,7 +1931,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
//ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
......@@ -1733,7 +1941,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
&response,
NULL)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
//ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
done:
......@@ -1747,9 +1955,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
ks_assert(dht);
ks_assert(job);
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
job->state = KS_DHT_JOB_STATE_COMPLETING;
//ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
// done:
return ret;
......@@ -1805,7 +2011,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
ben_dict_set(a, ben_blob("want", 4), want);
}
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
//ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
done:
......@@ -1850,7 +2056,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
want6 = message->raddr.family == AF_INET6;
}
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
//ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
query.nodeid = *target;
......@@ -1869,7 +2075,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
......@@ -1886,7 +2093,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
......@@ -1902,7 +2110,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
//ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);
done:
......@@ -1964,11 +2172,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes[job->response_nodes_count++] = node;
......@@ -2000,9 +2208,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG,
"Set closer node id %s (%s) in search of target id %s at results index %d\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hexid(&distance, id2_buf),
ks_dht_hexid(&search->target, id3_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
search->results[results_index] = nid;
search->distances[results_index] = distance;
......@@ -2026,11 +2234,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
......@@ -2062,9 +2270,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG,
"Set closer node id %s (%s) in search of target id %s at results index %d\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hexid(&distance, id2_buf),
ks_dht_hexid(&search->target, id3_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
search->results[results_index] = nid;
search->distances[results_index] = distance;
......@@ -2079,9 +2287,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
}
}
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
job->state = KS_DHT_JOB_STATE_COMPLETING;
//ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
done:
if(search) ks_mutex_unlock(search->mutex);
......@@ -2130,7 +2336,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
//ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
......@@ -2163,7 +2369,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
//ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
......@@ -2190,7 +2396,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
......@@ -2207,7 +2414,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
......@@ -2225,7 +2433,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
if (item) {
if (item->mutable) {
if (!sequence_snuffed) {
ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE));
ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
ben_dict_set(r, ben_blob("sig", 3), ben_blob(item->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
}
ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
......@@ -2235,7 +2443,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
ks_log(KS_LOG_DEBUG, "Sending message response get\n");
//ks_log(KS_LOG_DEBUG, "Sending message response get\n");
ks_q_push(dht->send_q, (void *)response);
done:
......@@ -2246,7 +2454,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
{
ks_dht_storageitem_t *item = NULL;
ks_dht_token_t *token = NULL;
ks_dht_storageitem_key_t *k = NULL;
ks_dht_storageitem_pkey_t *k = NULL;
ks_dht_storageitem_signature_t *sig = NULL;
struct bencode *seq;
int64_t sequence = -1;
......@@ -2271,7 +2479,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
job->response_token = *token;
if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_pkey(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(job->response->args, "seq");
......@@ -2297,7 +2505,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
nodes6_size = ben_str_len(n);
}
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
//ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
......@@ -2308,11 +2516,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes[job->response_nodes_count++] = node;
}
......@@ -2325,11 +2533,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
}
......@@ -2343,72 +2551,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
if (!seq) {
// immutable
if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_storageitem_target_immutable_internal(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
else if ((ret = ks_dht_storageitem_create_immutable(&item,
else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
dht->pool,
&tmptarget,
v)) != KS_STATUS_SUCCESS) goto done;
v,
KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
} else {
// mutable
struct bencode *tmp = NULL;
uint8_t *tmpsig = NULL;
size_t tmpsig_len = 0;
int32_t res = 0;
if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
ks_log(KS_LOG_DEBUG, "Mutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
tmp = ben_dict();
if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
tmpsig = ben_encode(&tmpsig_len, tmp);
ben_free(tmp);
res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
free(tmpsig);
if (res) {
ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
if (!ks_dht_storageitem_signature_verify(sig, k, job->query_salt, seq, v)) {
ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
ret = KS_STATUS_FAIL;
goto done;
}
ks_log(KS_LOG_DEBUG, "Signature verified for %s\n", ks_dht_hex(tmptarget.id, id_buf, KS_DHT_NODEID_SIZE));
if (olditem) {
if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
else {
ks_hash_remove(dht->storageitems_hash, olditem->id.id);
olditem = NULL;
if (olditem->seq > sequence) {
goto done;
}
if (olditem->seq == sequence) {
if (ben_cmp(olditem->v, v) != 0) {
goto done;
}
} else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
}
if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
dht->pool,
&tmptarget,
v,
KS_TRUE,
k,
job->query_salt,
KS_TRUE,
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
job->state = KS_DHT_JOB_STATE_COMPLETING;
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
......@@ -2417,22 +2615,25 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
return ret;
}
// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(target);
ks_assert(token);
ks_assert(item);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item);
ks_dht_jobs_add(dht, job);
done:
......@@ -2455,9 +2656,17 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (job->query_storageitem->mutable) {
if (job->query_cas > 0) ben_dict_set(a, ben_blob("cas", 3), ben_int(job->query_cas));
ben_dict_set(a, ben_blob("k", 1), ben_blob(job->query_storageitem->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
if (job->query_storageitem->salt) ben_dict_set(a, ben_blob("salt", 4), ben_clone(job->query_storageitem->salt));
ben_dict_set(a, ben_blob("seq", 3), ben_int(job->query_storageitem->seq));
ben_dict_set(a, ben_blob("sig", 3), ben_blob(job->query_storageitem->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
}
ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
ks_log(KS_LOG_DEBUG, "Sending message query put\n");
//ks_log(KS_LOG_DEBUG, "Sending message query put\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
......@@ -2465,6 +2674,20 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_dht_token_t *token = NULL;
ks_dht_storageitem_pkey_t *k = NULL;
ks_dht_storageitem_signature_t *sig = NULL;
struct bencode *salt = NULL;
struct bencode *seq = NULL;
int64_t sequence = -1;
struct bencode *cas = NULL;
int64_t cas_seq = -1;
struct bencode *v = NULL;
//ks_size_t v_len = 0;
ks_bool_t storageitems_locked = KS_FALSE;
ks_dht_storageitem_t *item = NULL;
ks_dht_storageitem_t *olditem = NULL;
ks_dht_nodeid_t target;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
......@@ -2473,7 +2696,99 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
salt = ben_dict_get_by_str(message->args, "salt");
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
cas = ben_dict_get_by_str(message->args, "cas");
if (cas) cas_seq = ben_int_val(cas);
if (seq && (!k || !sig)) {
ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
v = ben_dict_get_by_str(message->args, "v");
if (!v) {
ks_log(KS_LOG_DEBUG, "Must provide v\n");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
//v_len = ben_str_len(v);
if (!seq) {
// immutable
if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
} else {
// mutable
if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
}
olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
ks_log(KS_LOG_DEBUG, "Invalid token\n");
ret = KS_STATUS_FAIL;
goto done;
}
//ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
if (!seq) {
// immutable
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
dht->pool,
&target,
v,
KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
} else {
// mutable
if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) {
if (cas && olditem->seq != cas_seq) {
// @todo send 301 error instead of the response
goto done;
}
if (olditem->seq > sequence) {
// @todo send 302 error instead of the response
goto done;
}
if (olditem->seq == sequence) {
if (ben_cmp(olditem->v, v) != 0) {
// @todo send 201? error instead of the response
goto done;
}
} else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
}
else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
dht->pool,
&target,
v,
KS_TRUE,
k,
salt,
KS_TRUE,
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
......@@ -2483,10 +2798,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
&response,
&r)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Sending message response put\n");
//ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response);
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
if (item) ks_dht_storageitem_destroy(&item);
}
return ret;
}
......@@ -2499,8 +2818,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
job->state = KS_DHT_JOB_STATE_COMPLETING;
// done:
return ret;
}
......
......@@ -29,11 +29,12 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION 30
#define KS_DHT_TRANSACTION_EXPIRATION 10
#define KS_DHT_SEARCH_EXPIRATION 10
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SKEY_SIZE crypto_sign_SECRETKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
#define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
......@@ -48,7 +49,8 @@ typedef struct ks_dht_datagram_s ks_dht_datagram_t;
typedef struct ks_dht_job_s ks_dht_job_t;
typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
typedef struct ks_dht_token_s ks_dht_token_t;
typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t;
typedef struct ks_dht_storageitem_pkey_s ks_dht_storageitem_pkey_t;
typedef struct ks_dht_storageitem_skey_s ks_dht_storageitem_skey_t;
typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
typedef struct ks_dht_message_s ks_dht_message_t;
typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
......@@ -103,7 +105,6 @@ enum ks_dht_job_state_t {
KS_DHT_JOB_STATE_QUERYING,
KS_DHT_JOB_STATE_RESPONDING,
KS_DHT_JOB_STATE_EXPIRING,
KS_DHT_JOB_STATE_PROCESSING,
KS_DHT_JOB_STATE_COMPLETING,
};
......@@ -133,6 +134,9 @@ struct ks_dht_job_s {
// job specific query parameters
ks_dht_nodeid_t query_target;
struct bencode *query_salt;
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
// job specific response parameters
ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
......@@ -158,8 +162,12 @@ struct ks_dhtrt_querynodes_s {
ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
};
struct ks_dht_storageitem_key_s {
uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE];
struct ks_dht_storageitem_pkey_s {
uint8_t key[KS_DHT_STORAGEITEM_PKEY_SIZE];
};
struct ks_dht_storageitem_skey_s {
uint8_t key[KS_DHT_STORAGEITEM_SKEY_SIZE];
};
struct ks_dht_storageitem_signature_s {
......@@ -225,8 +233,9 @@ struct ks_dht_storageitem_s {
struct bencode *v;
ks_bool_t mutable;
ks_dht_storageitem_key_t pk;
ks_dht_storageitem_key_t sk;
ks_mutex_t *mutex;
ks_dht_storageitem_pkey_t pk;
ks_dht_storageitem_skey_t sk;
struct bencode *salt;
int64_t seq;
ks_dht_storageitem_signature_t sig;
......@@ -361,8 +370,72 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_skey_t *sk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
const uint8_t *value,
ks_size_t value_length);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
......@@ -370,6 +443,16 @@ KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
uint8_t *salt,
ks_size_t salt_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item);
/**
* Create a network search of the closest nodes to a target.
* @param dht pointer to the dht instance
......@@ -391,34 +474,6 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
ks_dht_search_t **search);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr,
ks_bool_t alloc_data);
/**
*
*/
KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/**
* route table methods
*
......
......@@ -78,18 +78,20 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
ks_assert(token);
ks_assert(item);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
job->query_token = *token;
job->query_cas = cas;
job->query_storageitem = item;
}
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
......
......@@ -105,7 +105,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
memcpy(message->type, yv, yv_len);
message->type[yv_len] = '\0';
ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type);
//ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type);
return KS_STATUS_SUCCESS;
}
......
......@@ -44,9 +44,9 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
if (s->pending) {
for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_search_pending_t *val;
ks_hash_this_val(it, (void **)&val);
const void *key = NULL;
ks_dht_search_pending_t *val = NULL;
ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_search_pending_destroy(&val);
}
ks_hash_destroy(&s->pending);
......
......@@ -2,7 +2,11 @@
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v)
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_bool_t clone_v)
{
ks_dht_storageitem_t *si;
ks_status_t ret = KS_STATUS_SUCCESS;
......@@ -19,16 +23,9 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
si->id = *target;
si->mutable = KS_FALSE;
si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
si->v = ben_clone(v);
si->v = clone_v ? ben_clone(v) : v;
ks_assert(si->v);
//enc = ben_encode(&enc_len, si->v);
//ks_assert(enc);
//SHA1_Init(&sha);
//SHA1_Update(&sha, enc, enc_len);
//SHA1_Final(si->id.id, &sha);
//free(enc);
// done:
if (ret != KS_STATUS_SUCCESS) {
if (si) ks_dht_storageitem_destroy(item);
......@@ -36,12 +33,34 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
const uint8_t *value,
ks_size_t value_length)
{
struct bencode *v = NULL;
ks_assert(item);
ks_assert(pool);
ks_assert(value);
ks_assert(value_length > 0);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
v = ben_blob(value, value_length);
ks_assert(v);
return ks_dht_storageitem_create_immutable_internal(item, pool, target, v, KS_FALSE);
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_dht_storageitem_key_t *k,
ks_bool_t clone_v,
ks_dht_storageitem_pkey_t *pk,
struct bencode *salt,
ks_bool_t clone_salt,
int64_t sequence,
ks_dht_storageitem_signature_t *signature)
{
......@@ -52,7 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
ks_assert(pool);
ks_assert(v);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
ks_assert(k);
ks_assert(pk);
ks_assert(signature);
*item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
......@@ -62,12 +81,15 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
si->id = *target;
si->mutable = KS_TRUE;
si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
si->v = ben_clone(v);
si->v = clone_v ? ben_clone(v) : v;
ks_assert(si->v);
si->pk = *k;
ks_mutex_create(&si->mutex, KS_MUTEX_FLAG_DEFAULT, si->pool);
ks_assert(si->mutex);
si->pk = *pk;
if (salt) {
si->salt = ben_clone(salt);
si->salt = clone_salt ? ben_clone(salt) : salt;
ks_assert(si->salt);
}
si->seq = sequence;
......@@ -80,6 +102,48 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
const uint8_t *value,
ks_size_t value_length,
ks_dht_storageitem_pkey_t *pk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
ks_dht_storageitem_signature_t *signature)
{
struct bencode *v = NULL;
struct bencode *s = NULL;
ks_assert(item);
ks_assert(pool);
ks_assert(value);
ks_assert(value_length > 0);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
ks_assert(pk);
ks_assert(signature);
v = ben_blob(value, value_length);
if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
return ks_dht_storageitem_create_mutable_internal(item, pool, target, v, KS_FALSE, pk, s, KS_FALSE, sequence, signature);
}
KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature)
{
ks_assert(item);
ks_assert(v);
ks_assert(sequence);
ks_assert(signature);
ks_mutex_lock(item->mutex);
ben_free(item->v);
item->v = ben_clone(v);
item->seq = sequence;
item->sig = *signature;
ks_mutex_unlock(item->mutex);
}
/**
*
*/
......@@ -96,6 +160,7 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
ben_free(si->v);
si->v = NULL;
}
if (si->mutex) ks_mutex_destroy(&si->mutex);
if (si->salt) {
ben_free(si->salt);
si->salt = NULL;
......
......@@ -3,14 +3,32 @@
#include <../dht/ks_dht-int.h>
#include <tap.h>
#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
ks_dht_storageitem_skey_t sk;
ks_dht_storageitem_pkey_t pk;
ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
diag("dht_z_callback\n");
ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
diag("dht2_put_callback\n");
return KS_STATUS_SUCCESS;
}
ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
char buf[KS_DHT_TOKEN_SIZE * 2 + 1];
const char *v = "Hello World!";
size_t v_len = strlen(v);
ks_dht_storageitem_signature_t sig;
ks_dht_storageitem_t *mutable = NULL;
diag("dht2_get_token_callback %s\n", ks_dht_hex(job->response_token.token, buf, KS_DHT_TOKEN_SIZE));
ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
// @todo check if exists
ks_dht_storageitem_create_mutable(&mutable, dht->pool, &job->query_target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
mutable->sk = sk;
ks_dht_storageitems_insert(dht, mutable);
ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable);
return KS_STATUS_SUCCESS;
}
......@@ -30,6 +48,27 @@ int main() {
ks_sockaddr_t raddr1;
//ks_sockaddr_t raddr2;
//ks_sockaddr_t raddr3;
ks_dht_nodeid_t target;
//ks_dht_storageitem_t *immutable = NULL;
//ks_dht_storageitem_t *mutable = NULL;
//const char *v = "Hello World!";
//size_t v_len = strlen(v);
//ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b,
//0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d,
//0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22,
//0xe7, 0x54, 0x94, 0x1a, 0xc7, 0x84, 0x79, 0xd6, 0xc5, 0x4e, 0x1f, 0xaf, 0x60, 0x37, 0x88, 0x1d } };
//ks_dht_storageitem_pkey_t pk; //= { { 0x77, 0xff, 0x84, 0x90, 0x5a, 0x91, 0x93, 0x63, 0x67, 0xc0, 0x13, 0x60, 0x80, 0x31, 0x04, 0xf9,
//0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } };
//uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE];
//uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE];
//ks_dht_storageitem_signature_t sig;
//char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1];
//char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1];
//const char *test1vector = "3:seqi1e1:v12:Hello World!";
//const char *test1vector = "4:salt6:foobar3:seqi1e1:v12:Hello World!";
//size_t test1vector_len = strlen(test1vector);
//uint8_t test1vector_sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE];
//char test1vector_buf[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
err = ks_init();
ok(!err);
......@@ -62,9 +101,6 @@ int main() {
err = ks_dht_create(&dht3, NULL, NULL);
ok(err == KS_STATUS_SUCCESS);
ks_dht_register_type(dht1, "z", dht_z_callback);
if (have_v4) {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
ok(err == KS_STATUS_SUCCESS);
......@@ -111,30 +147,9 @@ int main() {
ok(err == KS_STATUS_SUCCESS);
}
//diag("Custom type tests\n");
//buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
//memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
//dht1->recv_buffer_length = buflen;
//err = ks_dht_process(dht1, ep1, &raddr);
//ok(err == KS_STATUS_SUCCESS);
//ks_dht_pulse(dht1, 100);
//ks_dht_pulse(&dht2, 100);
//buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
//memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);
//dht1->recv_buffer_length = buflen;
//err = ks_dht_process(dht1, &raddr);
//ok(err == KS_STATUS_SUCCESS);
/*
diag("Ping test\n");
//ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
......@@ -147,7 +162,7 @@ int main() {
ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
ks_dht_pulse(dht2, 100); // (COMPLETING)
ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
......@@ -157,31 +172,92 @@ int main() {
ks_dht_pulse(dht2, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
*/
// Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
//diag("Get test\n");
diag("Find_Node test\n");
ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
/*
ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target);
ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len);
ks_dht_storageitems_insert(dht1, immutable);
*/
ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
/*
crypto_sign_keypair(pk.key, sk.key);
ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
mutable->sk = sk;
ks_dht_storageitems_insert(dht1, mutable);
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0);
ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
ks_dht_pulse(dht2, 100); // send get query
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht1, 100); // receive get query and send get response
ks_dht_pulse(dht2, 100); // receive get response
ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored
ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
*/
diag("Put test\n");
crypto_sign_keypair(pk.key, sk.key);
ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
ks_dht_pulse(dht2, 100); // send get query
ks_dht_pulse(dht1, 100); // receive get query and send get response
ks_dht_pulse(dht2, 100); // receive get response
ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query
ks_dht_pulse(dht1, 100); // receive put query and send put response
ks_dht_pulse(dht2, 100); // receive put response
ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
//diag("DHT 1\n");
ks_dht_pulse(dht1, 100);
//diag("DHT 2\n");
ks_dht_pulse(dht2, 100);
}
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
// Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
//diag("Find_Node test\n");
//ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
//ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
//ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
//ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
//ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
//ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
//diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
//for (int i = 0; i < 10; ++i) {
//diag("DHT 1\n");
//ks_dht_pulse(dht1, 100);
//diag("DHT 2\n");
//ks_dht_pulse(dht2, 100);
//}
//ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
/* Cleanup and shutdown */
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论