提交 340c4f7e authored 作者: Anthony Minessale II's avatar Anthony Minessale II

Merge pull request #1169 in FS/freeswitch from feature/FS-9775-rewrite-dht to master

* commit '57f793a0': (92 commits)
  FS-9775: Remove nodeid from bucket container
  FS-9775: Update testbuckets to latest api
  FS-9775: Update testbuckets to new api
  FS-9775: Committing start of very basic datastore concept, will start tieing in the physical transport layer for replication under new ticket/branch
  fix unqlite build, don't do warn all ansi on it
  FS-9775: Committing to get assistance with building unqlite
  FS-9775: Remove moved h file references
  FS-9775: First round of integration of DHT into libblade, requires ongoing changes to DHT for proper exposure to blade level
  FS-9775: Fixed building libblade with address sanitizing support
  FS-9775: Tweaks, bug fixes, etc. Committing in preparation for introducing into libblade.
  FS-9775:  Implement serialization, deserialization & repopulation for dht table
  FS-9775: A bunch of stuff related to chaining multiple jobs, bug fixes, few other changes
  FS-9775: DHT Repopulate empty buckets
  FS-9775: DHT Process table timing changes & test cleanup
  FS-9775: Some cleanup and bug fixes in DHT, switched to using hash destructors, and added sending of errors to most failed query scenarios
  FS-9775: First tested pass on search functionality, not tested with deep searching at multiple levels
  FS-9775:  Add flags to dhtrt_create_node (merge)
  FS-9775: Bug fixes and exposed interface changes while implementing tests for get/put which are functional and pass initial tests now. Deep searching needs to be revamped now to complete the full announcing process.
  FS-9775:  Exclude newly created nodes from find
  FS-9775: Fix memory reuse.  Remove redundant fields
  ...
Makefile Makefile
Makefile.in Makefile.in
build/*.m4 build/*.m4
configure
差异被折叠。
...@@ -5,14 +5,20 @@ AUTOMAKE_OPTIONS = subdir-objects ...@@ -5,14 +5,20 @@ AUTOMAKE_OPTIONS = subdir-objects
AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libblade.la noinst_LTLIBRARIES = libunqlite.la
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c libunqlite_la_SOURCES = src/unqlite.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS) libunqlite_la_CFLAGS = -DUNQLITE_ENABLE_THREADS
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS) libunqlite_la_LIBADD = -lpthread
lib_LTLIBRARIES = libblade.la
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c src/blade_datastore.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS)
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS)
libblade_la_LIBADD = libunqlite.la
library_includedir = $(prefix)/include library_includedir = $(prefix)/include
library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h
library_include_HEADERS += test/tap.h library_include_HEADERS += src/include/blade_datastore.h
library_include_HEADERS += src/include/unqlite.h test/tap.h
tests: libblade.la tests: libblade.la
$(MAKE) -C test tests $(MAKE) -C test tests
......
AC_DEFUN([AX_CFLAGS_WARN_ALL_ANSI],[dnl AC_DEFUN([AX_CFLAGS_WARN_ALL_ANSI],[dnl
AS_VAR_PUSHDEF([FLAGS],[CFLAGS])dnl AS_VAR_PUSHDEF([FLAGS],[AM_CFLAGS])dnl
AS_VAR_PUSHDEF([VAR],[ac_cv_cflags_warn_all_ansi])dnl AS_VAR_PUSHDEF([VAR],[ac_cv_cflags_warn_all_ansi])dnl
AC_CACHE_CHECK([m4_ifval($1,$1,FLAGS) for maximum ansi warnings], AC_CACHE_CHECK([m4_ifval($1,$1,FLAGS) for maximum ansi warnings],
VAR,[VAR="no, unknown" VAR,[VAR="no, unknown"
...@@ -47,7 +47,7 @@ AS_VAR_POPDEF([FLAGS])dnl ...@@ -47,7 +47,7 @@ AS_VAR_POPDEF([FLAGS])dnl
dnl the only difference - the LANG selection... and the default FLAGS dnl the only difference - the LANG selection... and the default FLAGS
AC_DEFUN([AX_CXXFLAGS_WARN_ALL_ANSI],[dnl AC_DEFUN([AX_CXXFLAGS_WARN_ALL_ANSI],[dnl
AS_VAR_PUSHDEF([FLAGS],[CXXFLAGS])dnl AS_VAR_PUSHDEF([FLAGS],[AM_CXXFLAGS])dnl
AS_VAR_PUSHDEF([VAR],[ac_cv_cxxflags_warn_all_ansi])dnl AS_VAR_PUSHDEF([VAR],[ac_cv_cxxflags_warn_all_ansi])dnl
AC_CACHE_CHECK([m4_ifval($1,$1,FLAGS) for maximum ansi warnings], AC_CACHE_CHECK([m4_ifval($1,$1,FLAGS) for maximum ansi warnings],
VAR,[VAR="no, unknown" VAR,[VAR="no, unknown"
......
...@@ -241,11 +241,9 @@ AC_ARG_ENABLE(address_sanitizer, ...@@ -241,11 +241,9 @@ AC_ARG_ENABLE(address_sanitizer,
[enable_address_sanitizer="no"]) [enable_address_sanitizer="no"])
if test "${enable_address_sanitizer}" = "yes"; then if test "${enable_address_sanitizer}" = "yes"; then
if test "x${ax_cv_c_compiler_vendor}" = "xclang" ; then AM_CFLAGS="${AM_CFLAGS} -fsanitize=address -fno-omit-frame-pointer"
AM_CFLAGS="${AM_CFLAGS} -fsanitize=address -fno-omit-frame-pointer" AM_CXXFLAGS="${AM_CXXFLAGS} -fsanitize=address -fno-omit-frame-pointer"
AM_CXXFLAGS="${AM_CXXFLAGS} -fsanitize=address -fno-omit-frame-pointer" AM_LDFLAGS="${AM_LDFLAGS} -fsanitize=address"
AM_LDFLAGS="${AM_LDFLAGS} -fsanitize=address"
fi
fi fi
AC_ARG_WITH([libks], AC_ARG_WITH([libks],
......
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
typedef enum {
BDS_NONE = 0,
BDS_MYPOOL = (1 << 0),
} bdspvt_flag_t;
struct blade_datastore_s {
bdspvt_flag_t flags;
ks_pool_t *pool;
unqlite *db;
};
struct blade_datastore_fetch_userdata_s
{
blade_datastore_t *bds;
blade_datastore_fetch_callback_t callback;
void *userdata;
};
typedef struct blade_datastore_fetch_userdata_s blade_datastore_fetch_userdata_t;
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
{
blade_datastore_t *bds = NULL;
bdspvt_flag_t flags;
ks_pool_t *pool;
ks_assert(bdsP);
bds = *bdsP;
*bdsP = NULL;
ks_assert(bds);
flags = bds->flags;
pool = bds->pool;
if (bds->db) {
unqlite_close(bds->db);
bds->db = NULL;
}
ks_pool_free(bds->pool, &bds);
if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool)
{
bdspvt_flag_t newflags = BDS_NONE;
blade_datastore_t *bds = NULL;
if (!pool) {
newflags |= BDS_MYPOOL;
ks_pool_open(&pool);
ks_assert(pool);
}
bds = ks_pool_alloc(pool, sizeof(*bds));
bds->flags = newflags;
bds->pool = pool;
*bdsP = bds;
if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
const char *errbuf = NULL;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
return KS_STATUS_FAIL;
}
// @todo unqlite_lib_config(UNQLITE_LIB_CONFIG_MEM_ERR_CALLBACK)
// @todo VM init if document store is used (and output consumer callback)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout)
{
ks_assert(bds);
ks_assert(timeout >= 0);
}
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length)
{
ks_assert(bds);
ks_assert(bds->db);
ks_assert(buffer);
unqlite_config(bds->db, UNQLITE_CONFIG_ERR_LOG, buffer, buffer_length);
}
KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length)
{
int32_t rc;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bds);
ks_assert(bds->db);
ks_assert(key);
ks_assert(key_length > 0);
ks_assert(data);
ks_assert(data_length > 0);
rc = unqlite_begin(bds->db);
if (rc != UNQLITE_OK) {
if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}
} else if (unqlite_kv_store(bds->db, key, key_length, data, data_length) == UNQLITE_OK) unqlite_commit(bds->db);
else unqlite_rollback(bds->db);
return ret;
}
int blade_datastore_fetch_callback(const void *data, unsigned int data_length, void *userdata)
{
int rc = UNQLITE_OK;
blade_datastore_fetch_userdata_t *ud = NULL;
ks_assert(data);
ks_assert(data_length > 0);
ks_assert(userdata);
ud = (blade_datastore_fetch_userdata_t *)userdata;
if (!ud->callback(ud->bds, data, data_length, ud->userdata)) rc = UNQLITE_ABORT;
return rc;
}
KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
blade_datastore_fetch_callback_t callback,
const void *key,
int32_t key_length,
void *userdata)
{
int32_t rc;
ks_status_t ret = KS_STATUS_SUCCESS;
blade_datastore_fetch_userdata_t ud;
ks_assert(bds);
ks_assert(bds->db);
ks_assert(callback);
ks_assert(key);
ks_assert(key_length > 0);
ud.bds = bds;
ud.callback = callback;
ud.userdata = userdata;
rc = unqlite_kv_fetch_callback(bds->db, key, key_length, blade_datastore_fetch_callback, &ud);
if (rc != UNQLITE_OK) {
if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}
}
return ret;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
...@@ -33,14 +33,22 @@ ...@@ -33,14 +33,22 @@
#include "blade.h" #include "blade.h"
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
typedef enum { typedef enum {
BP_NONE = 0, BP_NONE = 0,
BP_MYPOOL = (1 << 0) BP_MYPOOL = (1 << 0),
BP_MYTPOOL = (1 << 1)
} bppvt_flag_t; } bppvt_flag_t;
struct blade_peer_s { struct blade_peer_s {
bppvt_flag_t flags; bppvt_flag_t flags;
ks_pool_t *pool; ks_pool_t *pool;
ks_thread_pool_t *tpool;
ks_dht_t *dht;
}; };
...@@ -60,33 +68,75 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP) ...@@ -60,33 +68,75 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
flags = bp->flags; flags = bp->flags;
pool = bp->pool; pool = bp->pool;
ks_pool_free(bp->pool, bp); if (bp->dht) ks_dht_destroy(&bp->dht);
if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
ks_pool_free(bp->pool, &bp);
if (pool && (flags & BP_MYPOOL)) { if (pool && (flags & BP_MYPOOL)) ks_pool_close(&pool);
ks_pool_close(&pool);
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool) KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
{ {
bppvt_flag_t newflags = BP_NONE; bppvt_flag_t newflags = BP_NONE;
blade_peer_t *bp = NULL; blade_peer_t *bp = NULL;
ks_dht_t *dht = NULL;
if (!pool) { if (!pool) {
newflags |= BP_MYPOOL; newflags |= BP_MYPOOL;
ks_pool_open(&pool); ks_pool_open(&pool);
ks_assert(pool);
}
if (!tpool) {
newflags |= BP_MYTPOOL;
ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
ks_assert(tpool);
} }
ks_dht_create(&dht, pool, tpool, nodeid);
ks_assert(dht);
bp = ks_pool_alloc(pool, sizeof(*bp)); bp = ks_pool_alloc(pool, sizeof(*bp));
bp->pool = pool;
bp->flags = newflags; bp->flags = newflags;
bp->pool = pool;
bp->tpool = tpool;
bp->dht = dht;
*bpP = bp; *bpP = bp;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp)
{
ks_assert(bp);
ks_assert(bp->dht);
return &bp->dht->nodeid;
}
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(bp);
ks_dht_autoroute(bp->dht, autoroute, port);
}
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
ks_assert(bp);
ks_assert(addr);
return ks_dht_bind(bp->dht, addr, endpoint);
}
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout)
{
ks_assert(bp);
ks_assert(timeout >= 0);
ks_dht_pulse(bp->dht, timeout);
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
......
...@@ -35,13 +35,16 @@ ...@@ -35,13 +35,16 @@
typedef enum { typedef enum {
BH_NONE = 0, BH_NONE = 0,
BH_MYPOOL = (1 << 0) BH_MYPOOL = (1 << 0),
BH_MYTPOOL = (1 << 1)
} bhpvt_flag_t; } bhpvt_flag_t;
struct blade_handle_s { struct blade_handle_s {
ks_pool_t *pool;
bhpvt_flag_t flags; bhpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
blade_peer_t *peer; blade_peer_t *peer;
blade_datastore_t *datastore;
}; };
...@@ -61,9 +64,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) ...@@ -61,9 +64,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
flags = bh->flags; flags = bh->flags;
pool = bh->pool; pool = bh->pool;
if (bh->datastore) blade_datastore_destroy(&bh->datastore);
blade_peer_destroy(&bh->peer); blade_peer_destroy(&bh->peer);
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, bh); ks_pool_free(bh->pool, &bh);
if (pool && (flags & BH_MYPOOL)) { if (pool && (flags & BH_MYPOOL)) {
ks_pool_close(&pool); ks_pool_close(&pool);
...@@ -72,26 +78,116 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) ...@@ -72,26 +78,116 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool) KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid)
{ {
bhpvt_flag_t newflags = BH_NONE; bhpvt_flag_t newflags = BH_NONE;
blade_handle_t *bh = NULL; blade_handle_t *bh = NULL;
ks_dht_nodeid_t nid;
ks_assert(nodeid);
ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2));
if (!pool) { if (!pool) {
newflags |= BH_MYPOOL; newflags |= BH_MYPOOL;
ks_pool_open(&pool); ks_pool_open(&pool);
} }
if (!tpool) {
newflags |= BH_MYTPOOL;
ks_thread_pool_create(&tpool, BLADE_HANDLE_TPOOL_MIN, BLADE_HANDLE_TPOOL_MAX, BLADE_HANDLE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_HANDLE_TPOOL_IDLE);
ks_assert(tpool);
}
bh = ks_pool_alloc(pool, sizeof(*bh)); bh = ks_pool_alloc(pool, sizeof(*bh));
bh->pool = pool;
bh->flags = newflags; bh->flags = newflags;
blade_peer_create(&bh->peer, bh->pool); bh->pool = pool;
bh->tpool = tpool;
ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE);
blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid);
*bhP = bh; *bhP = bh;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer)
{
ks_dht_nodeid_t *nodeid = NULL;
ks_assert(bh);
ks_assert(bh->peer);
nodeid = blade_peer_myid(bh->peer);
ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
}
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(bh);
ks_assert(bh->peer);
blade_peer_autoroute(bh->peer, autoroute, port);
}
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint)
{
ks_sockaddr_t addr;
int family = AF_INET;
ks_assert(bh);
ks_assert(ip);
ks_assert(port);
if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6;
ks_addr_set(&addr, ip, port, family);
return blade_peer_bind(bh->peer, &addr, endpoint);
}
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout)
{
ks_assert(bh);
ks_assert(timeout >= 0);
blade_peer_pulse(bh->peer, timeout);
if (bh->datastore) blade_datastore_pulse(bh->datastore, timeout);
}
KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh)
{
ks_assert(bh);
if (bh->datastore) return;
blade_datastore_create(&bh->datastore, bh->pool);
}
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
{
ks_assert(bh);
ks_assert(bh->datastore);
ks_assert(key);
ks_assert(key_length > 0);
ks_assert(data);
ks_assert(data_length > 0);
return blade_datastore_store(bh->datastore, key, key_length, data, data_length);
}
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,
const void *key,
int32_t key_length,
void *userdata)
{
ks_assert(bh);
ks_assert(bh->datastore);
ks_assert(callback);
ks_assert(key);
ks_assert(key_length > 0);
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
}
/* For Emacs: /* For Emacs:
......
...@@ -34,10 +34,13 @@ ...@@ -34,10 +34,13 @@
#ifndef _BLADE_H_ #ifndef _BLADE_H_
#define _BLADE_H_ #define _BLADE_H_
#include <ks.h> #include <ks.h>
#include <ks_dht.h>
#include <sodium.h> #include <sodium.h>
#include "unqlite.h"
#include "blade_types.h" #include "blade_types.h"
#include "blade_stack.h" #include "blade_stack.h"
#include "blade_peer.h" #include "blade_peer.h"
#include "blade_datastore.h"
#include "bpcp.h" #include "bpcp.h"
KS_BEGIN_EXTERN_C KS_BEGIN_EXTERN_C
......
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_DATASTORE_H_
#define _BLADE_DATASTORE_H_
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout);
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length);
KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
blade_datastore_fetch_callback_t callback,
const void *key,
int32_t key_length,
void *userdata);
KS_END_EXTERN_C
#endif
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
...@@ -35,9 +35,18 @@ ...@@ -35,9 +35,18 @@
#define _BPCP_H_ #define _BPCP_H_
#include <blade.h> #include <blade.h>
#define BLADE_PEER_TPOOL_MIN 2
#define BLADE_PEER_TPOOL_MAX 8
#define BLADE_PEER_TPOOL_STACK (1024 * 256)
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool); KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP); KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp);
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout);
KS_END_EXTERN_C KS_END_EXTERN_C
#endif #endif
......
...@@ -35,9 +35,25 @@ ...@@ -35,9 +35,25 @@
#define _BLADE_STACK_H_ #define _BLADE_STACK_H_
#include <blade.h> #include <blade.h>
#define BLADE_HANDLE_TPOOL_MIN 2
#define BLADE_HANDLE_TPOOL_MAX 8
#define BLADE_HANDLE_TPOOL_STACK (1024 * 256)
#define BLADE_HANDLE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP); KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool); KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid);
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer);
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,
const void *key,
int32_t key_length,
void *userdata);
KS_END_EXTERN_C KS_END_EXTERN_C
#endif #endif
......
...@@ -39,6 +39,9 @@ KS_BEGIN_EXTERN_C ...@@ -39,6 +39,9 @@ KS_BEGIN_EXTERN_C
typedef struct blade_handle_s blade_handle_t; typedef struct blade_handle_s blade_handle_t;
typedef struct blade_peer_s blade_peer_t; typedef struct blade_peer_s blade_peer_t;
typedef struct blade_datastore_s blade_datastore_t;
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
KS_END_EXTERN_C KS_END_EXTERN_C
......
差异被折叠。
差异被折叠。
...@@ -7,6 +7,11 @@ testbuild_SOURCES = testbuild.c tap.c ...@@ -7,6 +7,11 @@ testbuild_SOURCES = testbuild.c tap.c
testbuild_CFLAGS = $(AM_CFLAGS) testbuild_CFLAGS = $(AM_CFLAGS)
testbuild_LDADD = $(TEST_LDADD) testbuild_LDADD = $(TEST_LDADD)
check_PROGRAMS += bladec
bladec_SOURCES = bladec.c tap.c
bladec_CFLAGS = $(AM_CFLAGS)
bladec_LDADD = $(TEST_LDADD)
TESTS=$(check_PROGRAMS) TESTS=$(check_PROGRAMS)
......
#include "blade.h"
#include "tap.h"
#ifdef _WIN32
#define STDIO_FD(_fs) _fileno(_fs)
#define READ(_fd, _buffer, _count) _read(_fd, _buffer, _count)
#else
#define STDIO_FD(_fs) fileno(_fs)
#define READ(_fd, _buffer, _count) read(_fd, _buffer, _count)
#endif
#define CONSOLE_INPUT_MAX 512
ks_bool_t g_shutdown = KS_FALSE;
char g_console_input[CONSOLE_INPUT_MAX];
size_t g_console_input_length = 0;
size_t g_console_input_eol = 0;
void loop(blade_handle_t *bh);
void process_console_input(blade_handle_t *bh, char *line);
typedef void (*command_callback)(blade_handle_t *bh, char *args);
struct command_def_s {
const char *cmd;
command_callback callback;
};
void command_test(blade_handle_t *bh, char *args);
void command_quit(blade_handle_t *bh, char *args);
void command_myid(blade_handle_t *bh, char *args);
void command_bind(blade_handle_t *bh, char *args);
void command_store(blade_handle_t *bh, char *args);
void command_fetch(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "test", command_test },
{ "quit", command_quit },
{ "myid", command_myid },
{ "bind", command_bind },
{ "store", command_store },
{ "fetch", command_fetch },
{ NULL, NULL }
};
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
const char *nodeid;
ks_assert(argc >= 2);
nodeid = argv[1];
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
blade_init();
blade_handle_create(&bh, NULL, NULL, nodeid);
blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT);
loop(bh);
blade_handle_destroy(&bh);
blade_shutdown();
return 0;
}
void buffer_console_input(void)
{
ssize_t bytes = 0;
struct pollfd poll[1];
poll[0].fd = STDIO_FD(stdin);
poll[0].events = POLLIN | POLLERR;
if (ks_poll(poll, 1, 1) > 0) {
if (poll[0].revents & POLLIN) {
if ((bytes = READ(poll[0].fd, g_console_input + g_console_input_length, CONSOLE_INPUT_MAX - g_console_input_length)) <= 0) {
// @todo error
return;
}
g_console_input_length += bytes;
}
}
}
void loop(blade_handle_t *bh)
{
while (!g_shutdown) {
ks_bool_t eol = KS_FALSE;
buffer_console_input();
for (; g_console_input_eol < g_console_input_length; ++g_console_input_eol) {
char c = g_console_input[g_console_input_eol];
if (c == '\r' || c == '\n') {
eol = KS_TRUE;
break;
}
}
if (eol) {
g_console_input[g_console_input_eol] = '\0';
process_console_input(bh, g_console_input);
g_console_input_eol++;
for (; g_console_input_eol < g_console_input_length; ++g_console_input_eol) {
char c = g_console_input[g_console_input_eol];
if (c != '\r' && c != '\n') break;
}
if (g_console_input_eol == g_console_input_length) g_console_input_eol = g_console_input_length = 0;
else {
memcpy(g_console_input, g_console_input + g_console_input_eol, g_console_input_length - g_console_input_eol);
g_console_input_length -= g_console_input_eol;
g_console_input_eol = 0;
}
}
if (g_console_input_length == CONSOLE_INPUT_MAX) {
// @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line?
ks_assert(0);
}
blade_handle_pulse(bh, 1);
}
}
void parse_argument(char **input, char **arg, char terminator)
{
char *tmp;
ks_assert(input);
ks_assert(*input);
ks_assert(arg);
tmp = *input;
*arg = tmp;
while (*tmp && *tmp != terminator) ++tmp;
if (*tmp == terminator) {
*tmp = '\0';
++tmp;
}
*input = tmp;
}
void process_console_input(blade_handle_t *bh, char *line)
{
char *args = line;
char *cmd = NULL;
ks_bool_t found = KS_FALSE;
ks_log(KS_LOG_DEBUG, "Output: %s\n", line);
parse_argument(&args, &cmd, ' ');
ks_log(KS_LOG_DEBUG, "Command: %s, Args: %s\n", cmd, args);
for (int32_t index = 0; command_defs[index].cmd; ++index) {
if (!strcmp(command_defs[index].cmd, cmd)) {
found = KS_TRUE;
command_defs[index].callback(bh, args);
}
}
if (!found) ks_log(KS_LOG_INFO, "Command '%s' unknown.\n", cmd);
}
void command_test(blade_handle_t *bh, char *args)
{
ks_log(KS_LOG_DEBUG, "Hello World!\n");
}
void command_quit(blade_handle_t *bh, char *args)
{
ks_assert(bh);
ks_assert(args);
ks_log(KS_LOG_DEBUG, "Shutting down\n");
g_shutdown = KS_TRUE;
}
void command_myid(blade_handle_t *bh, char *args)
{
char buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(bh);
ks_assert(args);
blade_handle_myid(bh, buf);
ks_log(KS_LOG_INFO, "%s\n", buf);
}
void command_bind(blade_handle_t *bh, char *args)
{
char *ip = NULL;
char *port = NULL;
ks_port_t p;
ks_assert(args);
parse_argument(&args, &ip, ' ');
parse_argument(&args, &port, ' ');
p = atoi(port); // @todo use strtol for error handling
blade_handle_bind(bh, ip, p, NULL);
}
void command_store(blade_handle_t *bh, char *args)
{
char *key;
char *data;
ks_assert(args);
blade_handle_datastore_start(bh);
parse_argument(&args, &key, ' ');
parse_argument(&args, &data, ' ');
blade_handle_datastore_store(bh, key, strlen(key), data, strlen(data) + 1);
}
ks_bool_t blade_datastore_fetch_callback(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata)
{
ks_log(KS_LOG_INFO, "%s\n", data);
return KS_TRUE;
}
void command_fetch(blade_handle_t *bh, char *args)
{
char *key;
ks_assert(args);
blade_handle_datastore_start(bh);
parse_argument(&args, &key, ' ');
blade_handle_datastore_fetch(bh, blade_datastore_fetch_callback, key, strlen(key), bh);
}
...@@ -11,7 +11,7 @@ int main(void) ...@@ -11,7 +11,7 @@ int main(void)
plan(1); plan(1);
status = blade_handle_create(&bh, NULL); status = blade_handle_create(&bh, NULL, NULL);
status = blade_handle_destroy(&bh); status = blade_handle_destroy(&bh);
ok(status == KS_STATUS_SUCCESS); ok(status == KS_STATUS_SUCCESS);
......
...@@ -45,8 +45,22 @@ Only use // style-comments on tempory comments that will probably be removed eve ...@@ -45,8 +45,22 @@ Only use // style-comments on tempory comments that will probably be removed eve
Add the emacs/vi comment to the bottom of every file. Add the emacs/vi comment to the bottom of every file.
Use Doxygen for function args. Use Doxygen for function args.
Tabs not spaces. Tabs not spaces.
Use flags as bitwise when possible, use arrays if going beyond 32
Typedef all enums using UPPER_CASE notation for the values
*/ */
typedef enum {
SOME_FLAG_X = (1 << 0),
SOME_FLAG_Y = (1 << 1)
} some_flag_type_t;
typedef enum {
SOME_TYPE_X = 1,
SOME_TYPE_Y,
SOME_TYPE_Z
} some_type_t;
KS_DECLARE(ks_status_t) function_example(somedata_t **data, ks_pool_t *pool) KS_DECLARE(ks_status_t) function_example(somedata_t **data, ks_pool_t *pool)
{ {
int var = 3, x = 0; int var = 3, x = 0;
...@@ -54,7 +68,7 @@ KS_DECLARE(ks_status_t) function_example(somedata_t **data, ks_pool_t *pool) ...@@ -54,7 +68,7 @@ KS_DECLARE(ks_status_t) function_example(somedata_t **data, ks_pool_t *pool)
if (!pool) return KS_STATUS_FAIL; if (!pool) return KS_STATUS_FAIL;
for (x = 0; x < 100; x++) { for (x = 0; x < 100; x++) {
var = += x; var += x;
} }
if (var > 20) { if (var > 20) {
......
...@@ -3,16 +3,19 @@ EXTRA_DIST = ...@@ -3,16 +3,19 @@ EXTRA_DIST =
SUBDIRS = . test SUBDIRS = . test
AUTOMAKE_OPTIONS = subdir-objects AUTOMAKE_OPTIONS = subdir-objects
AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt -O0
AM_CPPFLAGS = $(AM_CFLAGS) AM_CPPFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = libks.la lib_LTLIBRARIES = libks.la
libks_la_SOURCES = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_thread_pool.c src/ks_mutex.c src/ks_config.c libks_la_SOURCES = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_thread_pool.c src/ks_mutex.c src/ks_config.c
libks_la_SOURCES += src/ks_log.c src/ks_socket.c src/ks_buffer.c src/ks_pool.c src/simclist.c libks_la_SOURCES += src/ks_log.c src/ks_socket.c src/ks_buffer.c src/ks_pool.c src/simclist.c
libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/ks_dso.c src/ks_dht.c libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/ks_dso.c # src/ks_dht.c
libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_publish.c src/dht/ks_dht_distribute.c src/dht/ks_dht_storageitem.c
libks_la_SOURCES += src/dht/ks_dht_bucket.c
libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c
#aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
...@@ -24,7 +27,7 @@ library_includedir = $(prefix)/include ...@@ -24,7 +27,7 @@ library_includedir = $(prefix)/include
library_include_HEADERS = src/include/ks_config.h src/include/ks.h src/include/ks_threadmutex.h src/include/ks_json.h src/include/ks_buffer.h library_include_HEADERS = src/include/ks_config.h src/include/ks.h src/include/ks_threadmutex.h src/include/ks_json.h src/include/ks_buffer.h
library_include_HEADERS += src/include/ks_thread_pool.h library_include_HEADERS += src/include/ks_thread_pool.h
library_include_HEADERS += src/include/ks_pool.h src/include/simclist.h src/include/ks_time.h src/include/ks_q.h src/include/ks_socket.h library_include_HEADERS += src/include/ks_pool.h src/include/simclist.h src/include/ks_time.h src/include/ks_q.h src/include/ks_socket.h
library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include/ks_platform.h src/include/ks_types.h # src/include/ks_rng.h library_include_HEADERS += src/include/ks_dso.h src/include/ks_platform.h src/include/ks_types.h # src/include/ks_rng.h src/include/ks_dht.h
library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h
library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h
library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h
......
差异被折叠。
差异被折叠。
#include "ks_dht.h"
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
ks_pool_t *pool,
ks_dht_t *dht,
ks_dht_endpoint_t *endpoint,
const ks_sockaddr_t *raddr)
{
ks_dht_datagram_t *dg;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(datagram);
ks_assert(pool);
ks_assert(dht);
ks_assert(endpoint);
ks_assert(raddr);
ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
*datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t));
ks_assert(dg);
dg->pool = pool;
dg->dht = dht;
dg->endpoint = endpoint;
dg->raddr = *raddr;
memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length);
dg->buffer_length = dht->recv_buffer_length;
// done:
if (ret != KS_STATUS_SUCCESS) {
ks_dht_datagram_destroy(datagram);
}
return ret;
}
KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram)
{
ks_dht_datagram_t *dg;
ks_assert(datagram);
ks_assert(*datagram);
dg = *datagram;
ks_pool_free(dg->pool, datagram);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
#include "ks_dht.h"
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_distribute_create(ks_dht_distribute_t **distribute,
ks_pool_t *pool,
ks_dht_storageitem_callback_t callback,
void *data,
int64_t cas,
ks_dht_storageitem_t *item)
{
ks_dht_distribute_t *d;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(distribute);
ks_assert(pool);
ks_assert(cas >= 0);
ks_assert(item);
*distribute = d = ks_pool_alloc(pool, sizeof(ks_dht_distribute_t));
ks_assert(d);
d->pool = pool;
d->callback = callback;
d->data = data;
ks_mutex_create(&d->mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
ks_assert(d->mutex);
d->cas = cas;
d->item = item;
ks_dht_storageitem_reference(d->item);
// done:
if (ret != KS_STATUS_SUCCESS) {
if (d) ks_dht_distribute_destroy(distribute);
}
return ret;
}
KS_DECLARE(void) ks_dht_distribute_destroy(ks_dht_distribute_t **distribute)
{
ks_dht_distribute_t *d;
ks_assert(distribute);
ks_assert(*distribute);
d = *distribute;
if (d->mutex) ks_mutex_destroy(&d->mutex);
ks_dht_storageitem_dereference(d->item);
ks_pool_free(d->pool, distribute);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
#include "ks_dht.h"
#include "ks_dht-int.h"
#include "sodium.h"
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_pool_t *pool,
const ks_sockaddr_t *addr,
ks_socket_t sock)
{
ks_dht_endpoint_t *ep;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(endpoint);
ks_assert(pool);
ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
*endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t));
ks_assert(ep);
ep->pool = pool;
ep->addr = *addr;
ep->sock = sock;
// done:
if (ret != KS_STATUS_SUCCESS) {
if (ep) ks_dht_endpoint_destroy(endpoint);
}
return ret;
}
/**
*
*/
KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint)
{
ks_dht_endpoint_t *ep;
ks_assert(endpoint);
ks_assert(*endpoint);
ep = *endpoint;
if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
ks_pool_free(ep->pool, endpoint);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
#include "ks_dht.h"
#include "ks_dht-int.h"
KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
ks_pool_t *pool,
const ks_sockaddr_t *raddr,
int32_t attempts,
void *data)
{
ks_dht_job_t *j;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(job);
ks_assert(pool);
//ks_assert(dht);
ks_assert(attempts > 0 && attempts <= 10);
*job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t));
ks_assert(j);
j->pool = pool;
j->state = KS_DHT_JOB_STATE_QUERYING;
if (raddr) j->raddr = *raddr;
j->attempts = attempts;
j->data = data;
// done:
if (ret != KS_STATUS_SUCCESS) {
if (j) ks_dht_job_destroy(job);
}
return ret;
}
KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback)
{
ks_assert(job);
ks_assert(query_callback);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
}
KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
}
KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
const uint8_t *salt,
ks_size_t salt_length)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
}
KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(token);
ks_assert(item);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_token = *token;
job->query_cas = cas;
job->query_storageitem = item;
ks_dht_storageitem_reference(job->query_storageitem);
}
KS_DECLARE(void) ks_dht_job_build_search(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback)
{
ks_assert(job);
ks_assert(query_callback);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
}
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
{
ks_dht_job_t *j;
ks_assert(job);
ks_assert(*job);
j = *job;
if (j->query_salt) ben_free(j->query_salt);
if (j->response_id) ks_dhtrt_release_node(j->response_id);
for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]);
for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]);
if (j->query_storageitem) ks_dht_storageitem_dereference(j->query_storageitem);
if (j->response_storageitem) ks_dht_storageitem_dereference(j->response_storageitem);
if (j->error_description) ben_free(j->error_description);
ks_pool_free(j->pool, job);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
差异被折叠。
#include "ks_dht.h"
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_publish_create(ks_dht_publish_t **publish,
ks_pool_t *pool,
ks_dht_job_callback_t callback,
void *data,
int64_t cas,
ks_dht_storageitem_t *item)
{
ks_dht_publish_t *p;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(publish);
ks_assert(pool);
ks_assert(cas >= 0);
ks_assert(item);
*publish = p = ks_pool_alloc(pool, sizeof(ks_dht_publish_t));
ks_assert(p);
p->pool = pool;
p->callback = callback;
p->data = data;
p->cas = cas;
p->item = item;
ks_dht_storageitem_reference(p->item);
// done:
if (ret != KS_STATUS_SUCCESS) {
if (p) ks_dht_publish_destroy(publish);
}
return ret;
}
KS_DECLARE(void) ks_dht_publish_destroy(ks_dht_publish_t **publish)
{
ks_dht_publish_t *p;
ks_assert(publish);
ks_assert(*publish);
p = *publish;
ks_dht_storageitem_dereference(p->item);
ks_pool_free(p->pool, publish);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论