提交 0302eca5 authored 作者: colm's avatar colm 提交者: Mike Jerris

FS-9775: Mark suspect and expired nodes. Add to makefile

上级 d0349687
...@@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k ...@@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_bucket.c
libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c
#aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
...@@ -29,7 +29,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include ...@@ -29,7 +29,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include
library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h
library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h
library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h
library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h src/dht/ks_dht_bucket.h
tests: libks.la tests: libks.la
$(MAKE) -C test tests $(MAKE) -C test tests
......
...@@ -39,30 +39,37 @@ ...@@ -39,30 +39,37 @@
/* change for testing */ /* change for testing */
#define KS_DHT_BUCKETSIZE 20 #define KS_DHT_BUCKETSIZE 20
#define KS_DHTRT_INACTIVETIME (5*60)
#define KS_DHTRT_MAXPING 3
/* peer flags */ /* peer flags */
#define DHTPEER_ACTIVE 0x01 #define DHTPEER_ACTIVE 1
#define DHTPEER_SUSPECT 0x02 #define DHTPEER_SUSPECT 2
#define DHTPEER_EXPIRED 0x04 #define DHTPEER_EXPIRED 3
/* internal structures */ /* internal structures */
typedef struct ks_dhtrt_rw_lock_s {
ks_pool_t* pool;
ks_mutex_t* mutex;
ks_cond_t* rcond;
volatile uint16_t read_count;
ks_cond_t* wcond;
volatile uint16_t write_count; /* hopefully never more than 1 ! */
} ks_dhtrt_rw_lock;
typedef struct ks_dhtrt_bucket_entry_s { typedef struct ks_dhtrt_bucket_entry_s {
ks_time_t tyme; ks_time_t tyme;
unsigned char id[KS_DHT_IDSIZE]; unsigned char id[KS_DHT_IDSIZE];
ks_dhtrt_node* gptr; /* ptr to peer */ ks_dhtrt_node* gptr; /* ptr to peer */
uint8_t inuse; uint8_t inuse;
uint8_t outstanding_pings;
uint8_t flags; /* active, suspect, expired */ uint8_t flags; /* active, suspect, expired */
struct ks_dhtrt_bucket_entry_s* left;
struct ks_dhtrt_bucket_entry_s* right;
struct ks_dhtrt_bucket_entry_s* prev;
} ks_dhtrt_bucket_entry; } ks_dhtrt_bucket_entry;
typedef struct ks_dhtrt_bucket_s { typedef struct ks_dhtrt_bucket_s {
ks_dhtrt_bucket_entry entries[KS_DHT_BUCKETSIZE]; ks_dhtrt_bucket_entry entries[KS_DHT_BUCKETSIZE];
ks_dhtrt_bucket_entry* first; /* sorted order - first*/ uint8_t count;
ks_dhtrt_bucket_entry* last; /* sorted order - last*/ uint8_t expired_count;
ks_dhtrt_bucket_entry* avail; /* available chain */
unsigned short count;
} ks_dhtrt_bucket; } ks_dhtrt_bucket;
...@@ -73,13 +80,14 @@ typedef struct ks_dhtrt_bucket_header { ...@@ -73,13 +80,14 @@ typedef struct ks_dhtrt_bucket_header {
struct ks_dhtrt_bucket_header* left; struct ks_dhtrt_bucket_header* left;
struct ks_dhtrt_bucket_header* right; struct ks_dhtrt_bucket_header* right;
ks_dhtrt_bucket* bucket; ks_dhtrt_bucket* bucket;
unsigned char mask[KS_DHT_IDSIZE]; ks_time_t tyme; /* last processed time */
unsigned char flags; unsigned char mask[KS_DHT_IDSIZE]; /* node id mask */
unsigned char flags;
} ks_dhtrt_bucket_header; } ks_dhtrt_bucket_header;
typedef struct ks_dhtrt_internal_s { typedef struct ks_dhtrt_internal_s {
ks_dhtrt_bucket_header* buckets; ks_dhtrt_bucket_header* buckets; /* root bucketheader */
/* */ /* */
} ks_dhtrt_internal; } ks_dhtrt_internal;
...@@ -150,6 +158,29 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, ...@@ -150,6 +158,29 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
unsigned char* hixor, unsigned char* hixor,
unsigned int max); unsigned int max);
static
void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry);
static
ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock);
static
void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock);
static
void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock);
static
ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock);
static
void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock);
static
void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock);
static
ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock);
static
void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock);
/* debugging */ /* debugging */
#define KS_DHT_DEBUGPRINTF_ #define KS_DHT_DEBUGPRINTF_
...@@ -241,6 +272,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr ...@@ -241,6 +272,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr
if (insanity > 3200) assert(insanity < 3200); if (insanity > 3200) assert(insanity < 3200);
/* first - seek a stale entry to eject */ /* first - seek a stale entry to eject */
if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, peer);
if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
}
/* /*
todo: attempting a ping at at this point would require us todo: attempting a ping at at this point would require us
to suspend this process ... tricky...assume right now we will go ahead and to suspend this process ... tricky...assume right now we will go ahead and
...@@ -312,9 +348,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable* table, ks_dhtr ...@@ -312,9 +348,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable* table, ks_dhtr
{ {
ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid); ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid);
if (header == 0) return KS_STATUS_FAIL; if (header == 0) return KS_STATUS_FAIL;
if (header->bucket == 0) return KS_STATUS_FAIL;
ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid); ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid);
if (e != 0) { if (e != 0) {
e->tyme = ks_time_now(); e->tyme = ks_time_now();
e->outstanding_pings = 0;
if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
e->flags = DHTPEER_ACTIVE; e->flags = DHTPEER_ACTIVE;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -497,14 +536,32 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table) ...@@ -497,14 +536,32 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table)
ks_dhtrt_bucket_header* header = internal->buckets; ks_dhtrt_bucket_header* header = internal->buckets;
ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8]; ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8];
int stackix=0; int stackix=0;
ks_time_t t0 = ks_time_now();
while(header) { while(header) {
stack[stackix++] = header; stack[stackix++] = header;
if (header->bucket) { if (header->bucket) {
/*ks_dhtrt_bucket* b = header->bucket;*/ ks_dhtrt_bucket* b = header->bucket;
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
ks_dhtrt_bucket_entry* e = &b->entries[ix];
} if (e->inuse == 1) {
/* more than n pings outstanding? */
if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
if (e->flags == DHTPEER_SUSPECT) {
ks_dhtrt_ping(e);
continue;
}
ks_time_t tdiff = t0 - e->tyme;
if (tdiff > KS_DHTRT_INACTIVETIME) {
e->flags = DHTPEER_SUSPECT;
ks_dhtrt_ping(e);
}
}
} /* end for each bucket_entry */
} }
header = header->left; header = header->left;
if (header == 0 && stackix > 1) { if (header == 0 && stackix > 1) {
...@@ -513,8 +570,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table) ...@@ -513,8 +570,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table)
header = header->right; header = header->right;
} }
} }
return;
} }
...@@ -570,6 +626,16 @@ void colm() { ...@@ -570,6 +626,16 @@ void colm() {
ks_dhtrt_xorcmp(0, 0, 0); ks_dhtrt_xorcmp(0, 0, 0);
ks_dhtrt_split_bucket(0, 0, 0); ks_dhtrt_split_bucket(0, 0, 0);
ks_dhtrt_shiftleft(0); ks_dhtrt_shiftleft(0);
ks_dhtrt_initrwlock( 0);
ks_dhtrt_deinitrwlock( 0);
ks_dhtrt_getreadlock( 0);
ks_dhtrt_getwritelock( 0);
ks_dhtrt_tryreadlock( 0);
ks_dhtrt_trywritelock( 0);
ks_dhtrt_releasereadlock( 0);
ks_dhtrt_releasewritelock( 0);
} }
...@@ -697,6 +763,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) ...@@ -697,6 +763,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
assert(0); assert(0);
} }
uint8_t free = KS_DHT_BUCKETSIZE; uint8_t free = KS_DHT_BUCKETSIZE;
uint8_t expiredix = KS_DHT_BUCKETSIZE;
/* find free .. but also check that it is not already here! */ /* find free .. but also check that it is not already here! */
uint8_t ix = 0; uint8_t ix = 0;
...@@ -705,6 +772,9 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) ...@@ -705,6 +772,9 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
if (free == KS_DHT_BUCKETSIZE) { if (free == KS_DHT_BUCKETSIZE) {
free = ix; /* use this one */ free = ix; /* use this one */
} }
}
else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
expiredix = ix;
} }
else if (!memcmp(bucket->entries[ix].id, peer->id, KS_DHT_IDSIZE)) { else if (!memcmp(bucket->entries[ix].id, peer->id, KS_DHT_IDSIZE)) {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
...@@ -716,6 +786,12 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) ...@@ -716,6 +786,12 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
return KS_STATUS_SUCCESS; /* already exists */ return KS_STATUS_SUCCESS; /* already exists */
} }
} }
if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
/* bump this one - but only if we have no other option */
free = expiredix;
--bucket->expired_count;
}
if ( free<KS_DHT_BUCKETSIZE ) { if ( free<KS_DHT_BUCKETSIZE ) {
bucket->entries[free].inuse = 1; bucket->entries[free].inuse = 1;
...@@ -732,7 +808,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) ...@@ -732,7 +808,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
assert(0); /* should not reach this point */
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
...@@ -884,6 +959,20 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xor ...@@ -884,6 +959,20 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xor
return loaded; return loaded;
} }
void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry) {
++entry->outstanding_pings;
/* @todo */
/* set the appropriate command in the node and queue if for processing */
/*ks_dht_node_t* node = entry->gptr; */
#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
printf(" ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
#endif
return;
}
/* /*
strictly for shifting the bucketheader mask strictly for shifting the bucketheader mask
so format must be a right filled mask (hex: ..ffffffff) so format must be a right filled mask (hex: ..ffffffff)
...@@ -960,6 +1049,81 @@ static int ks_dhtrt_ismasked(const unsigned char *id, const unsigned char *mask) ...@@ -960,6 +1049,81 @@ static int ks_dhtrt_ismasked(const unsigned char *id, const unsigned char *mask)
return 1; return 1;
} }
static
ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock)
{
ks_status_t s = ks_mutex_create(&lock->mutex, 0, lock->pool);
if (s != KS_STATUS_SUCCESS) return s;
s = ks_cond_create_ex(&lock->rcond, lock->pool, lock->mutex);
if (s != KS_STATUS_SUCCESS) return s;
s = ks_cond_create_ex(&lock->wcond, lock->pool, lock->mutex);
return s;
}
static
void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock)
{
ks_cond_destroy(&lock->rcond);
ks_cond_destroy(&lock->wcond);
ks_mutex_destroy(&lock->mutex);
memset(lock, 0, sizeof(ks_dhtrt_rw_lock));
}
static
void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock)
{
ks_mutex_lock(lock->mutex);
while (lock->write_count > 0) {
ks_cond_wait(lock->rcond);
}
++lock->read_count;
ks_mutex_unlock(lock->mutex);
}
static
ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock)
{
return KS_STATUS_FAIL;
}
static
void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock)
{
ks_mutex_lock(lock->mutex);
--lock->read_count;
if (lock->read_count == 0)
ks_cond_signal(lock->wcond);
ks_mutex_unlock(lock->mutex);
}
static
void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock)
{
ks_mutex_lock(lock->mutex);
while (lock->read_count > 0) {
ks_cond_wait(lock->wcond);
}
++lock->write_count;
ks_mutex_unlock(lock->mutex);
}
static
ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock)
{
return KS_STATUS_FAIL;
}
static
void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock)
{
ks_mutex_lock(lock->mutex);
--lock->write_count;
assert(lock->write_count==0);
ks_cond_broadcast(lock->rcond);
ks_mutex_unlock(lock->mutex);
}
static char* ks_dhtrt_printableid(const unsigned char* id, char* buffer) static char* ks_dhtrt_printableid(const unsigned char* id, char* buffer)
{ {
char* t = buffer; char* t = buffer;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论