提交 14a99987 authored 作者: Shane Bryldt's avatar Shane Bryldt 提交者: Mike Jerris

FS-9952: Preliminary session negotiations done, added a bunch of logging, fixed…

FS-9952: Preliminary session negotiations done, added a bunch of logging, fixed up cleanup code, needs more testing and more error handling
上级 3d8fd5dc
......@@ -90,6 +90,8 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
*bcP = bc;
ks_log(KS_LOG_DEBUG, "Created\n");
return KS_STATUS_SUCCESS;
}
......@@ -112,6 +114,8 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
ks_pool_free(bc->pool, bcP);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
return KS_STATUS_SUCCESS;
}
......@@ -133,6 +137,8 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c
return KS_STATUS_FAIL;
}
ks_log(KS_LOG_DEBUG, "Started\n");
return KS_STATUS_SUCCESS;
}
......@@ -153,6 +159,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
}
......@@ -163,6 +171,13 @@ KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc)
return bc->handle;
}
KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->pool;
}
KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc)
{
ks_assert(bc);
......@@ -285,8 +300,10 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
{
ks_assert(bc);
if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT)
if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT) {
ks_log(KS_LOG_DEBUG, "Connection (%s) disconnecting\n", bc->id);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH);
}
}
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json)
......@@ -342,6 +359,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
callback = blade_connection_state_callback_lookup(bc, state);
if (state == BLADE_CONNECTION_STATE_DISCONNECT) {
blade_handle_connections_remove(bc);
}
// @todo only READY state?
if (state != BLADE_CONNECTION_STATE_DETACH && state != BLADE_CONNECTION_STATE_DISCONNECT) {
while (blade_connection_sending_pop(bc, &json) == KS_STATUS_SUCCESS && json) {
......@@ -363,7 +383,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
break;
}
if (!(done = (json == NULL))) {
// @todo push json to session receiving queue
blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
ks_assert(bs);
blade_session_receiving_push(bs, json);
cJSON_Delete(json);
json = NULL;
}
......@@ -379,7 +401,8 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) {
switch (state) {
case BLADE_CONNECTION_STATE_DISCONNECT:
return NULL;
blade_connection_destroy(&bc);
break;
case BLADE_CONNECTION_STATE_NEW:
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT);
break;
......@@ -388,24 +411,38 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
break;
case BLADE_CONNECTION_STATE_ATTACH:
{
// @todo this is adding a second lock, since we keep it locked in the callback to allow finishing, we don't want get locking here...
// or just try unlocking twice to confirm...
blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
ks_assert(bs); // should not happen because bs should still be locked
blade_session_connections_add(bs, bc->id);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
blade_session_state_set(bs, BLADE_SESSION_STATE_READY);
blade_session_state_set(bs, BLADE_SESSION_STATE_READY); // @todo only set this if it's not already in the READY state from prior connection
blade_session_read_unlock(bs); // unlock the session we locked obtaining it above
blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching
break;
}
case BLADE_CONNECTION_STATE_DETACH:
// @todo detach from session if this connection is attached
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
break;
{
if (bc->session) {
blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
ks_assert(bs);
blade_session_connections_remove(bs, bc->id);
blade_session_read_unlock(bs);
// keep bc->session for later in case something triggers a reconnect later and needs the old session id for a hint
}
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
break;
}
default: break;
}
}
if (state == BLADE_CONNECTION_STATE_DISCONNECT) break;
}
return NULL;
......
......@@ -127,19 +127,6 @@ KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *u
}
}
// @todo remove this, temporary for testing
ks_log(KS_LOG_DEBUG, " name: %s\n", bi->name);
ks_log(KS_LOG_DEBUG, " domain: %s\n", bi->domain);
ks_log(KS_LOG_DEBUG, " resource: %s\n", bi->resource);
for (ks_hash_iterator_t *it = ks_hash_first(bi->parameters, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
const char *val = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&val);
ks_log(KS_LOG_DEBUG, " key: %s = %s\n", key, val);
}
return KS_STATUS_SUCCESS;
}
......
......@@ -81,6 +81,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
*bsP = bs;
ks_log(KS_LOG_DEBUG, "Created\n");
return KS_STATUS_SUCCESS;
}
......@@ -105,6 +107,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP)
ks_pool_free(bs->pool, bsP);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
return KS_STATUS_SUCCESS;
}
......@@ -125,6 +129,8 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs)
return KS_STATUS_FAIL;
}
ks_log(KS_LOG_DEBUG, "Started\n");
return KS_STATUS_SUCCESS;
}
......@@ -152,6 +158,8 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs)
list_iterator_stop(&bs->connections);
list_clear(&bs->connections);
ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
}
......@@ -226,8 +234,17 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs)
{
ks_assert(bs);
if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY)
if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) {
ks_log(KS_LOG_DEBUG, "Session (%s) hanging up\n", bs->id);
blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
}
}
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs)
{
ks_assert(bs);
return bs->state == BLADE_SESSION_STATE_HANGUP || bs->state == BLADE_SESSION_STATE_DESTROY;
}
KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
......@@ -242,6 +259,8 @@ KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const
list_append(&bs->connections, cid);
ks_log(KS_LOG_DEBUG, "Session (%s) connection added (%s)\n", bs->id, id);
return ret;
}
......@@ -256,6 +275,7 @@ KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, co
for (uint32_t i = 0; i < size; ++i) {
const char *cid = (const char *)list_get_at(&bs->connections, i);
if (!strcasecmp(cid, id)) {
ks_log(KS_LOG_DEBUG, "Session (%s) connection removed (%s)\n", bs->id, id);
list_delete_at(&bs->connections, i);
ks_pool_free(bs->pool, &cid);
break;
......@@ -278,7 +298,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b
// later there will need to be a way to pick which connection to use
cid = list_get_at(&bs->connections, 0);
if (!cid) {
// @todo error logging... this shouldn't happen
// no connections available
return KS_STATUS_FAIL;
}
......@@ -310,6 +330,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json)
if (blade_session_connections_choose(bs, json, &bc) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received
blade_connection_sending_push(bc, json);
blade_connection_read_unlock(bc);
}
return KS_STATUS_SUCCESS;
......@@ -334,7 +355,25 @@ KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **j
return ks_q_trypop(bs->sending, (void **)json);
}
// @todo receive queue push and pop
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json)
{
cJSON *json_copy = NULL;
ks_assert(bs);
ks_assert(json);
json_copy = cJSON_Duplicate(json, 1);
return ks_q_push(bs->receiving, json_copy);
}
KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json)
{
ks_assert(bs);
ks_assert(json);
return ks_q_trypop(bs->receiving, (void **)json);
}
void *blade_session_state_thread(ks_thread_t *thread, void *data)
{
......@@ -354,26 +393,56 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
if (!list_empty(&bs->connections)) {
while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
blade_connection_t *bc = NULL;
if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) blade_connection_sending_push(bc, json);
if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
blade_connection_sending_push(bc, json);
blade_connection_read_unlock(bc);
}
cJSON_Delete(json);
}
}
switch (state) {
case BLADE_SESSION_STATE_DESTROY:
ks_log(KS_LOG_DEBUG, "Session (%s) state destroy\n", bs->id);
blade_handle_sessions_remove(bs);
blade_session_destroy(&bs);
return NULL;
case BLADE_SESSION_STATE_HANGUP:
// @todo detach from session if this connection is attached
blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
break;
{
ks_log(KS_LOG_DEBUG, "Session (%s) state hangup\n", bs->id);
list_iterator_start(&bs->connections);
while (list_iterator_hasnext(&bs->connections)) {
const char *cid = (const char *)list_iterator_next(&bs->connections);
blade_connection_t *bc = blade_handle_connections_get(bs->handle, cid);
ks_assert(bc);
blade_connection_disconnect(bc);
blade_connection_read_unlock(bc);
}
list_iterator_stop(&bs->connections);
while (!list_empty(&bs->connections)) ks_sleep(100);
blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
break;
}
case BLADE_SESSION_STATE_CONNECT:
ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_ATTACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_DETACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_READY:
// @todo pop from session receiving queue and pass to blade_protocol_process()
ks_log(KS_LOG_DEBUG, "Session (%s) state ready\n", bs->id);
// @todo pop from session receiving queue and pass into protocol layer through something like blade_protocol_process()
ks_sleep_ms(1000);
break;
default: break;
}
......
......@@ -244,8 +244,17 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
blade_request_destroy(&value);
}
// @todo terminate all sessions, which will disconnect all attached connections
for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
blade_session_t *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
ks_hash_remove(bh->requests, key);
blade_session_hangup(value);
}
while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100);
// @todo call onshutdown and onunload callbacks for modules from DSOs, which will unregister transports and disconnect remaining unattached connections
// @todo unload DSOs
......@@ -312,7 +321,7 @@ KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, co
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target)
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_handle_transport_registration_t *bhtr = NULL;
......@@ -358,7 +367,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
// @todo need to be able to get to the blade_module_t from the callbacks, may require envelope around registration of callbacks to include module
// this is required because onconnect transport callback needs to be able to get back to the module data to create the connection being returned
if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target);
if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target, session_id);
else ret = KS_STATUS_FAIL;
return ret;
......@@ -423,7 +432,7 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, cons
{
blade_session_t *bs = NULL;
ks_assert(bs);
ks_assert(bh);
ks_assert(sid);
ks_hash_read_lock(bh->sessions);
......
......@@ -44,6 +44,7 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc);
KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc);
KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc);
KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bool_t block);
KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc);
......
......@@ -49,11 +49,14 @@ KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t
KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs);
KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state);
KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json);
KS_END_EXTERN_C
#endif
......
......@@ -50,7 +50,7 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks);
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid);
KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc);
......
......@@ -113,7 +113,7 @@ struct blade_module_callbacks_s {
};
typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target);
typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, cJSON *json);
typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json);
......
......@@ -253,7 +253,7 @@ void command_connect(blade_handle_t *bh, char *args)
blade_identity_create(&target, blade_handle_pool_get(bh));
if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target);
if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target, NULL);
blade_identity_destroy(&target);
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论