提交 58e48a54 authored 作者: colm's avatar colm 提交者: Mike Jerris

FS-9775: Add flags to dhtrt_create_node (merge)

上级 5dfd6d1b
...@@ -542,6 +542,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ...@@ -542,6 +542,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
KS_DHT_LOCAL, KS_DHT_LOCAL,
ep->addr.host, ep->addr.host,
ep->addr.port, ep->addr.port,
KS_DHTRT_CREATE_DEFAULT,
&ep->node)) != KS_STATUS_SUCCESS) goto done; &ep->node)) != KS_STATUS_SUCCESS) goto done;
} else { } else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done; if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done;
...@@ -550,6 +551,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ...@@ -550,6 +551,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
KS_DHT_LOCAL, KS_DHT_LOCAL,
ep->addr.host, ep->addr.host,
ep->addr.port, ep->addr.port,
KS_DHTRT_CREATE_DEFAULT,
&ep->node)) != KS_STATUS_SUCCESS) goto done; &ep->node)) != KS_STATUS_SUCCESS) goto done;
} }
/** /**
...@@ -1473,6 +1475,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me ...@@ -1473,6 +1475,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
KS_DHT_REMOTE, KS_DHT_REMOTE,
message->raddr.host, message->raddr.host,
message->raddr.port, message->raddr.port,
KS_DHTRT_CREATE_DEFAULT,
&node)) != KS_STATUS_SUCCESS) goto done; &node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
...@@ -1518,6 +1521,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ...@@ -1518,6 +1521,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
KS_DHT_REMOTE, KS_DHT_REMOTE,
message->raddr.host, message->raddr.host,
message->raddr.port, message->raddr.port,
KS_DHTRT_CREATE_DEFAULT,
&node)) != KS_STATUS_SUCCESS) goto done; &node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
...@@ -2177,7 +2181,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ...@@ -2177,7 +2181,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
addr.port); addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
job->response_nodes[job->response_nodes_count++] = node; job->response_nodes[job->response_nodes_count++] = node;
// @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
...@@ -2239,7 +2243,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ...@@ -2239,7 +2243,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
addr.port); addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
job->response_nodes6[job->response_nodes6_count++] = node; job->response_nodes6[job->response_nodes6_count++] = node;
// @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
...@@ -2521,7 +2525,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ...@@ -2521,7 +2525,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
addr.port); addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
job->response_nodes[job->response_nodes_count++] = node; job->response_nodes[job->response_nodes_count++] = node;
} }
while (nodes6_len < nodes6_size) { while (nodes6_len < nodes6_size) {
...@@ -2538,7 +2542,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ...@@ -2538,7 +2542,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
addr.port); addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
job->response_nodes6[job->response_nodes6_count++] = node; job->response_nodes6[job->response_nodes6_count++] = node;
} }
......
...@@ -85,13 +85,18 @@ struct ks_dht_nodeid_s { ...@@ -85,13 +85,18 @@ struct ks_dht_nodeid_s {
enum ks_afflags_t { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6}; enum ks_afflags_t { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6};
enum ks_dht_nodetype_t { KS_DHT_REMOTE=0x01, enum ks_dht_nodetype_t { KS_DHT_REMOTE=0x01,
KS_DHT_LOCAL=0x02, KS_DHT_LOCAL=0x02,
KS_DHT_BOTH=KS_DHT_REMOTE+KS_DHT_LOCAL }; KS_DHT_BOTH=KS_DHT_REMOTE+KS_DHT_LOCAL };
enum ks_create_node_flags_t {
KS_DHTRT_CREATE_DEFAULT=0,
KS_DHTRT_CREATE_PING,
KS_DHTRT_CREATE_TOUCH
};
struct ks_dht_node_s { struct ks_dht_node_s {
ks_dht_nodeid_t nodeid; ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr; ks_sockaddr_t addr;
// enum ks_afflags_t family; /* AF_INET or AF_INET6 */
enum ks_dht_nodetype_t type; /* local or remote */ enum ks_dht_nodetype_t type; /* local or remote */
ks_dhtrt_routetable_t* table; ks_dhtrt_routetable_t* table;
ks_rwl_t *reflock; ks_rwl_t *reflock;
...@@ -487,6 +492,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table ...@@ -487,6 +492,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table
ks_dht_nodeid_t nodeid, ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type, enum ks_dht_nodetype_t type,
char* ip, unsigned short port, char* ip, unsigned short port,
enum ks_create_node_flags_t flags,
ks_dht_node_t** node); ks_dht_node_t** node);
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node); KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
#pragma GCC optimize ("O0") /* # pragma GCC optimize ("O0") */
#include "ks_dht.h" #include "ks_dht.h"
...@@ -159,9 +159,9 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all); ...@@ -159,9 +159,9 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all);
static static
ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table); ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
static static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node); ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node, enum ks_create_node_flags_t flags);
static static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node); ks_dhtrt_bucket_entry_t* ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node);
static static
ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id); ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
static static
...@@ -279,6 +279,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ...@@ -279,6 +279,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
enum ks_dht_nodetype_t type, enum ks_dht_nodetype_t type,
char *ip, char *ip,
unsigned short port, unsigned short port,
enum ks_create_node_flags_t flags,
ks_dht_node_t **node) ks_dht_node_t **node)
{ {
if (!table || !table->internal) { if (!table || !table->internal) {
...@@ -295,16 +296,36 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ...@@ -295,16 +296,36 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) { if (bentry != 0) {
ks_rwl_read_lock(header->bucket->lock);
bentry->tyme = ks_time_now_sec(); bentry->tyme = ks_time_now_sec();
/* touch */
if (flags == KS_DHTRT_CREATE_TOUCH) {
bentry->outstanding_pings = 0;
bentry->touched = 1;
if (bentry->flags == DHTPEER_EXPIRED) {
--header->bucket->expired_count;
}
}
if (bentry->touched) { if (bentry->touched) {
bentry->flags = DHTPEER_ACTIVE; bentry->flags = DHTPEER_ACTIVE;
} }
/* ping */
if (!bentry->touched && !bentry->outstanding_pings) {
ks_dhtrt_ping(internal, bentry);
}
tnode = bentry->gptr; tnode = bentry->gptr;
ks_rwl_read_unlock(header->bucket->lock);
ks_rwl_read_lock( tnode->reflock); ks_rwl_read_lock( tnode->reflock);
ks_rwl_read_unlock(internal->lock); ks_rwl_read_unlock(internal->lock);
(*node) = tnode; (*node) = tnode;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -313,13 +334,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ...@@ -313,13 +334,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
tnode = ks_dhtrt_make_node(table); tnode = ks_dhtrt_make_node(table);
tnode->table = table; tnode->table = table;
enum ks_afflags_t family; enum ks_afflags_t family = AF_INET;
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
if (ip[i] == ':') { if (ip[i] == ':') {
family = AF_INET6; break; family = AF_INET6;
break;
} else if (ip[i] == '.') { } else if (ip[i] == '.') {
family = AF_INET; break; family = AF_INET;
break;
} }
} }
...@@ -333,7 +356,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ...@@ -333,7 +356,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
ks_status_t s = ks_dhtrt_insert_node(table, tnode); ks_status_t s = ks_dhtrt_insert_node(table, tnode, flags);
if (tnode && s == KS_STATUS_SUCCESS) { if (tnode && s == KS_STATUS_SUCCESS) {
ks_rwl_read_lock( tnode->reflock); ks_rwl_read_lock( tnode->reflock);
...@@ -387,7 +410,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh ...@@ -387,7 +410,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
} }
static static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node) ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node, enum ks_create_node_flags_t flags)
{ {
if (!table || !table->internal) { if (!table || !table->internal) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
...@@ -420,9 +443,9 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no ...@@ -420,9 +443,9 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
/* first - seek a stale entry to eject */ /* first - seek a stale entry to eject */
if (bucket->expired_count) { if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, node); ks_dhtrt_bucket_entry_t* entry = ks_dhtrt_insert_id(bucket, node);
if (s == KS_STATUS_SUCCESS) { if (entry != NULL) {
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
#endif #endif
...@@ -503,7 +526,20 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no ...@@ -503,7 +526,20 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
ks_log(KS_LOG_DEBUG, " ...into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer)); ks_log(KS_LOG_DEBUG, " ...into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif #endif
ks_status_t s = ks_dhtrt_insert_id(bucket, node); ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_bucket_entry_t* entry = ks_dhtrt_insert_id(bucket, node);
if (entry != NULL) {
s = KS_STATUS_SUCCESS;
/* touch */
if (flags == KS_DHTRT_CREATE_TOUCH) {
entry->flags = DHTPEER_ACTIVE;
entry->touched = 1;
}
else if (flags == KS_DHTRT_CREATE_PING ) {
ks_dhtrt_ping(internal, entry);
}
}
ks_rwl_write_unlock(internal->lock); ks_rwl_write_unlock(internal->lock);
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n",
...@@ -1295,7 +1331,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, ...@@ -1295,7 +1331,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
* so at least the static array does away with the need for locking. * so at least the static array does away with the need for locking.
*/ */
static static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) ks_dhtrt_bucket_entry_t* ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
{ {
/* sanity checks */ /* sanity checks */
if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) { if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) {
...@@ -1327,7 +1363,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) ...@@ -1327,7 +1363,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
ks_log(KS_LOG_DEBUG, "duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); ks_log(KS_LOG_DEBUG, "duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
#endif #endif
bucket->entries[ix].tyme = ks_time_now_sec(); bucket->entries[ix].tyme = ks_time_now_sec();
return KS_STATUS_SUCCESS; /* already exists : leave flags unchanged */ return &bucket->entries[ix]; /* already exists : leave flags unchanged */
} }
} }
...@@ -1352,10 +1388,10 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) ...@@ -1352,10 +1388,10 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
char buffer[100]; char buffer[100];
ks_log(KS_LOG_DEBUG, "Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free); ks_log(KS_LOG_DEBUG, "Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free);
#endif #endif
return KS_STATUS_SUCCESS; return &bucket->entries[free];
} }
return KS_STATUS_FAIL; return NULL;
} }
static static
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论