提交 2c996b59 authored 作者: Shane Bryldt's avatar Shane Bryldt

FS-10167: Switched connection lifecycle to an isolated pool similar to sessions,…

FS-10167: Switched connection lifecycle to an isolated pool similar to sessions, also refactored the inner WSS transport implementation to be part of the same pool so it is cleaned up with the connection. Switched the connection state machine thread to also utilize thread pool to avoid direct ownership over the thread, similar to session but without the use of a signaled condition due to abstraction of transports which may need to deal with polling for socket events such as with WSS transport. Completely removed the temporary worker thread in the main handle that was for connection/session cleanup.
上级 b84662ae
...@@ -37,11 +37,9 @@ struct blade_connection_s { ...@@ -37,11 +37,9 @@ struct blade_connection_s {
blade_handle_t *handle; blade_handle_t *handle;
ks_pool_t *pool; ks_pool_t *pool;
void *transport_init_data;
void *transport_data; void *transport_data;
blade_transport_callbacks_t *transport_callbacks; blade_transport_callbacks_t *transport_callbacks;
ks_bool_t shutdown;
blade_connection_direction_t direction; blade_connection_direction_t direction;
ks_thread_t *state_thread; ks_thread_t *state_thread;
blade_connection_state_t state; blade_connection_state_t state;
...@@ -63,10 +61,30 @@ ks_status_t blade_connection_state_on_detach(blade_connection_t *bc); ...@@ -63,10 +61,30 @@ ks_status_t blade_connection_state_on_detach(blade_connection_t *bc);
ks_status_t blade_connection_state_on_ready(blade_connection_t *bc); ks_status_t blade_connection_state_on_ready(blade_connection_t *bc);
KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, static void blade_connection_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
blade_handle_t *bh, {
void *transport_init_data, blade_connection_t *bc = (blade_connection_t *)ptr;
blade_transport_callbacks_t *transport_callbacks)
ks_assert(bc);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
blade_connection_shutdown(bc);
break;
case KS_MPCL_DESTROY:
// @todo remove this, it's just for posterity in debugging
bc->sending = NULL;
bc->lock = NULL;
//ks_pool_free(bc->pool, &bc->id);
bc->id = NULL;
break;
}
}
KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh)
{ {
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
ks_pool_t *pool = NULL; ks_pool_t *pool = NULL;
...@@ -74,15 +92,13 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, ...@@ -74,15 +92,13 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
ks_assert(bcP); ks_assert(bcP);
ks_assert(bh); ks_assert(bh);
ks_assert(transport_callbacks);
pool = blade_handle_pool_get(bh); ks_pool_open(&pool);
ks_assert(pool);
bc = ks_pool_alloc(pool, sizeof(blade_connection_t)); bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
bc->handle = bh; bc->handle = bh;
bc->pool = pool; bc->pool = pool;
bc->transport_init_data = transport_init_data;
bc->transport_callbacks = transport_callbacks;
ks_uuid(&id); ks_uuid(&id);
bc->id = ks_uuid_str(pool, &id); bc->id = ks_uuid_str(pool, &id);
...@@ -94,51 +110,48 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, ...@@ -94,51 +110,48 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
ks_q_create(&bc->sending, pool, 0); ks_q_create(&bc->sending, pool, 0);
ks_assert(bc->sending); ks_assert(bc->sending);
*bcP = bc; ks_assert(ks_pool_set_cleanup(pool, bc, NULL, blade_connection_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n"); ks_log(KS_LOG_DEBUG, "Created\n");
*bcP = bc;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
{ {
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
ks_pool_t *pool = NULL;
ks_assert(bcP); ks_assert(bcP);
ks_assert(*bcP); ks_assert(*bcP);
bc = *bcP; bc = *bcP;
blade_connection_shutdown(bc); pool = bc->pool;
//ks_pool_free(bc->pool, bcP);
ks_q_destroy(&bc->sending); ks_pool_close(&pool);
ks_rwl_destroy(&bc->lock);
ks_pool_free(bc->pool, &bc->id);
ks_pool_free(bc->pool, bcP);
ks_log(KS_LOG_DEBUG, "Destroyed\n"); ks_log(KS_LOG_DEBUG, "Destroyed\n");
*bcP = NULL;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction) KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
{ {
blade_handle_t *bh = NULL;
ks_assert(bc); ks_assert(bc);
bh = blade_connection_handle_get(bc);
bc->direction = direction; bc->direction = direction;
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
if (ks_thread_create_ex(&bc->state_thread, if (ks_thread_pool_add_job(blade_handle_tpool_get(bh), blade_connection_state_thread, bc) != KS_STATUS_SUCCESS) {
blade_connection_state_thread,
bc,
KS_THREAD_FLAG_DEFAULT,
KS_THREAD_DEFAULT_STACK,
KS_PRI_NORMAL,
bc->pool) != KS_STATUS_SUCCESS) {
// @todo error logging // @todo error logging
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
...@@ -154,14 +167,7 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) ...@@ -154,14 +167,7 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
ks_assert(bc); ks_assert(bc);
if (bc->state_thread) { blade_handle_connections_remove(bc);
bc->shutdown = KS_TRUE;
ks_thread_join(bc->state_thread);
ks_pool_free(bc->pool, &bc->state_thread);
bc->shutdown = KS_FALSE;
}
if (bc->session) ks_pool_free(bc->pool, &bc->session);
while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
...@@ -228,13 +234,6 @@ KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc) ...@@ -228,13 +234,6 @@ KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc)
} }
KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->transport_init_data;
}
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc) KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
{ {
ks_assert(bc); ks_assert(bc);
...@@ -242,11 +241,14 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc) ...@@ -242,11 +241,14 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
return bc->transport_data; return bc->transport_data;
} }
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data) KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data, blade_transport_callbacks_t *transport_callbacks)
{ {
ks_assert(bc); ks_assert(bc);
ks_assert(transport_data);
ks_assert(transport_callbacks);
bc->transport_data = transport_data; bc->transport_data = transport_data;
bc->transport_callbacks = transport_callbacks;
} }
blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state) blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
...@@ -356,18 +358,20 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) ...@@ -356,18 +358,20 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
{ {
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
blade_connection_state_t state; blade_connection_state_t state;
ks_bool_t shutdown = KS_FALSE;
ks_assert(thread); ks_assert(thread);
ks_assert(data); ks_assert(data);
bc = (blade_connection_t *)data; bc = (blade_connection_t *)data;
while (!bc->shutdown) { while (!shutdown) {
state = bc->state; state = bc->state;
switch (state) { switch (state) {
case BLADE_CONNECTION_STATE_DISCONNECT: case BLADE_CONNECTION_STATE_DISCONNECT:
blade_connection_state_on_disconnect(bc); blade_connection_state_on_disconnect(bc);
shutdown = KS_TRUE;
break; break;
case BLADE_CONNECTION_STATE_NEW: case BLADE_CONNECTION_STATE_NEW:
blade_connection_state_on_new(bc); blade_connection_state_on_new(bc);
...@@ -386,10 +390,10 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) ...@@ -386,10 +390,10 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
break; break;
default: break; default: break;
} }
if (state == BLADE_CONNECTION_STATE_DISCONNECT) break;
} }
blade_connection_destroy(&bc);
return NULL; return NULL;
} }
...@@ -536,7 +540,6 @@ ks_status_t blade_connection_state_on_ready(blade_connection_t *bc) ...@@ -536,7 +540,6 @@ ks_status_t blade_connection_state_on_ready(blade_connection_t *bc)
if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc); if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
else ks_sleep_ms(1);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
......
...@@ -291,7 +291,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b ...@@ -291,7 +291,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b
props_participant = cJSON_GetObjectItem(props, "blade.chat.participant"); props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
if (props_participant && props_participant->type == cJSON_True) { if (props_participant && props_participant->type == cJSON_True) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to join chat but is already a participant\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) attempted to join chat but is already a participant\n", blade_session_id_get(bs));
blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Already a participant of chat"); blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Already a participant of chat");
} else { } else {
ks_log(KS_LOG_DEBUG, "Session (%s) joined chat\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) joined chat\n", blade_session_id_get(bs));
...@@ -300,7 +300,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b ...@@ -300,7 +300,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b
ks_list_append(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and cleanup when removed ks_list_append(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and cleanup when removed
blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id); blade_rpc_response_create(&res, NULL, breq->message_id);
// @todo create an event to send to participants when a session joins and leaves, send after main response though // @todo create an event to send to participants when a session joins and leaves, send after main response though
} }
...@@ -343,7 +343,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t * ...@@ -343,7 +343,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t *
props_participant = cJSON_GetObjectItem(props, "blade.chat.participant"); props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
if (!props_participant || props_participant->type == cJSON_False) { if (!props_participant || props_participant->type == cJSON_False) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to leave chat but is not a participant\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) attempted to leave chat but is not a participant\n", blade_session_id_get(bs));
blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Not a participant of chat"); blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Not a participant of chat");
} else { } else {
ks_log(KS_LOG_DEBUG, "Session (%s) left chat\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) left chat\n", blade_session_id_get(bs));
...@@ -351,7 +351,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t * ...@@ -351,7 +351,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t *
ks_list_delete(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and search manually, also free the id ks_list_delete(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and search manually, also free the id
blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id); blade_rpc_response_create(&res, NULL, breq->message_id);
// @todo create an event to send to participants when a session joins and leaves, send after main response though // @todo create an event to send to participants when a session joins and leaves, send after main response though
} }
...@@ -388,17 +388,17 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b ...@@ -388,17 +388,17 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b
params = cJSON_GetObjectItem(breq->message, "params"); // @todo cache this in blade_request_t for quicker/easier access params = cJSON_GetObjectItem(breq->message, "params"); // @todo cache this in blade_request_t for quicker/easier access
if (!params) { if (!params) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'params' object\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'params' object\n", blade_session_id_get(bs));
blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params object"); blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params object");
} else if (!(message = cJSON_GetObjectCstr(params, "message"))) { } else if (!(message = cJSON_GetObjectCstr(params, "message"))) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'message'\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'message'\n", blade_session_id_get(bs));
blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params message string"); blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params message string");
} }
bs = blade_handle_sessions_get(breq->handle, breq->session_id); bs = blade_handle_sessions_get(breq->handle, breq->session_id);
ks_assert(bs); ks_assert(bs);
if (!res) { if (!res) {
blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id); blade_rpc_response_create(&res, NULL, breq->message_id);
sendevent = KS_TRUE; sendevent = KS_TRUE;
} }
blade_session_send(bs, res, NULL); blade_session_send(bs, res, NULL);
...@@ -408,7 +408,7 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b ...@@ -408,7 +408,7 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b
cJSON_Delete(res); cJSON_Delete(res);
if (sendevent) { if (sendevent) {
blade_rpc_event_create(breq->pool, &event, &res, "blade.chat.message"); blade_rpc_event_create(&event, &res, "blade.chat.message");
ks_assert(event); ks_assert(event);
cJSON_AddStringToObject(res, "from", breq->session_id); // @todo should really be the identity, but we don't have that in place yet cJSON_AddStringToObject(res, "from", breq->session_id); // @todo should really be the identity, but we don't have that in place yet
cJSON_AddStringToObject(res, "message", message); cJSON_AddStringToObject(res, "message", message);
......
...@@ -205,12 +205,11 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json, ...@@ -205,12 +205,11 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json,
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *id) KS_DECLARE(ks_status_t) blade_rpc_response_create(cJSON **json, cJSON **result, const char *id)
{ {
cJSON *root = NULL; cJSON *root = NULL;
cJSON *r = NULL; cJSON *r = NULL;
ks_assert(pool);
ks_assert(json); ks_assert(json);
ks_assert(id); ks_assert(id);
...@@ -229,12 +228,11 @@ KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json, ...@@ -229,12 +228,11 @@ KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json,
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJSON **error, const char *id, int32_t code, const char *message) KS_DECLARE(ks_status_t) blade_rpc_error_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message)
{ {
cJSON *root = NULL; cJSON *root = NULL;
cJSON *e = NULL; cJSON *e = NULL;
ks_assert(pool);
ks_assert(json); ks_assert(json);
//ks_assert(id); //ks_assert(id);
ks_assert(message); ks_assert(message);
...@@ -256,13 +254,12 @@ KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJ ...@@ -256,13 +254,12 @@ KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJ
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_rpc_event_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *event) KS_DECLARE(ks_status_t) blade_rpc_event_create(cJSON **json, cJSON **result, const char *event)
{ {
cJSON *root = NULL; cJSON *root = NULL;
cJSON *b = NULL; cJSON *b = NULL;
cJSON *r = NULL; cJSON *r = NULL;
ks_assert(pool);
ks_assert(json); ks_assert(json);
ks_assert(event); ks_assert(event);
......
...@@ -90,7 +90,8 @@ static void blade_session_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool ...@@ -90,7 +90,8 @@ static void blade_session_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool
bs->mutex = NULL; bs->mutex = NULL;
bs->lock = NULL; bs->lock = NULL;
ks_pool_free(bs->pool, &bs->id); //ks_pool_free(bs->pool, &bs->id);
bs->id = NULL;
break; break;
} }
} }
...@@ -139,7 +140,6 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle ...@@ -139,7 +140,6 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
ks_rwl_create(&bs->properties_lock, pool); ks_rwl_create(&bs->properties_lock, pool);
ks_assert(bs->properties_lock); ks_assert(bs->properties_lock);
ks_assert(ks_pool_set_cleanup(pool, bs, NULL, blade_session_cleanup) == KS_STATUS_SUCCESS); ks_assert(ks_pool_set_cleanup(pool, bs, NULL, blade_session_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n"); ks_log(KS_LOG_DEBUG, "Created\n");
......
...@@ -47,9 +47,6 @@ struct blade_handle_s { ...@@ -47,9 +47,6 @@ struct blade_handle_s {
config_setting_t *config_directory; config_setting_t *config_directory;
config_setting_t *config_datastore; config_setting_t *config_datastore;
ks_thread_t *worker_thread;
ks_bool_t shutdown;
ks_hash_t *transports; // registered transports exposed by modules, NOT active connections ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
ks_hash_t *spaces; // registered method spaces exposed by modules ks_hash_t *spaces; // registered method spaces exposed by modules
// registered event callback registry // registered event callback registry
...@@ -71,8 +68,6 @@ struct blade_handle_s { ...@@ -71,8 +68,6 @@ struct blade_handle_s {
ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
}; };
void *blade_handle_worker_thread(ks_thread_t *thread, void *data);
typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t; typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
struct blade_handle_transport_registration_s { struct blade_handle_transport_registration_s {
ks_pool_t *pool; ks_pool_t *pool;
...@@ -303,17 +298,6 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ ...@@ -303,17 +298,6 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
// @todo load internal modules, call onload and onstartup // @todo load internal modules, call onload and onstartup
if (ks_thread_create_ex(&bh->worker_thread,
blade_handle_worker_thread,
bh,
KS_THREAD_FLAG_DEFAULT,
KS_THREAD_DEFAULT_STACK,
KS_PRI_NORMAL,
bh->pool) != KS_STATUS_SUCCESS) {
// @todo error logging
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -323,17 +307,11 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ...@@ -323,17 +307,11 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
ks_assert(bh); ks_assert(bh);
while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) { // @todo call onshutdown for internal modules
void *key = NULL;
blade_request_t *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
ks_hash_remove(bh->requests, key);
blade_request_destroy(&value); // @todo repeat the same as below for connections, this will catch all including those that have not yet been attached to a session for edge case cleanup
// @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
}
ks_hash_read_lock(bh->sessions);
for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) { for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL; void *key = NULL;
blade_session_t *value = NULL; blade_session_t *value = NULL;
...@@ -342,9 +320,21 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ...@@ -342,9 +320,21 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
blade_session_hangup(value); blade_session_hangup(value);
} }
ks_hash_read_unlock(bh->sessions);
while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100); while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100);
// @todo unload internal modules, call onshutdown and onunload
// @todo call onunload for internal modules
while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
void *key = NULL;
blade_request_t *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
ks_hash_remove(bh->requests, key);
blade_request_destroy(&value);
}
while ((it = ks_hash_first(bh->events, KS_UNLOCKED))) { while ((it = ks_hash_first(bh->events, KS_UNLOCKED))) {
void *key = NULL; void *key = NULL;
...@@ -352,7 +342,6 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ...@@ -352,7 +342,6 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
ks_hash_this(it, (const void **)&key, NULL, (void **)&value); ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
blade_handle_event_unregister(bh, (const char *)key); blade_handle_event_unregister(bh, (const char *)key);
// @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
} }
while ((it = ks_hash_first(bh->spaces, KS_UNLOCKED))) { while ((it = ks_hash_first(bh->spaces, KS_UNLOCKED))) {
...@@ -361,20 +350,12 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ...@@ -361,20 +350,12 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
ks_hash_this(it, (const void **)&key, NULL, (void **)&value); ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
blade_handle_space_unregister(value); blade_handle_space_unregister(value);
// @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
} }
// @todo unload DSOs // @todo unload DSOs
if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore); if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
if (bh->worker_thread) {
bh->shutdown = KS_TRUE;
ks_thread_join(bh->worker_thread);
ks_pool_free(bh->pool, &bh->worker_thread);
bh->shutdown = KS_FALSE;
}
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -892,61 +873,6 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, ...@@ -892,61 +873,6 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata); return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
} }
void *blade_handle_worker_thread(ks_thread_t *thread, void *data)
{
blade_handle_t *bh = NULL;
blade_connection_t *bc = NULL;
//blade_session_t *bs = NULL;
ks_hash_iterator_t *it = NULL;
ks_q_t *cleanup = NULL;
ks_assert(thread);
ks_assert(data);
bh = (blade_handle_t *)data;
ks_q_create(&cleanup, bh->pool, 0);
ks_assert(cleanup);
while (!bh->shutdown) {
ks_hash_write_lock(bh->connections);
for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
blade_connection_t *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value);
}
ks_hash_write_unlock(bh->connections);
while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) {
blade_handle_connections_remove(bc);
blade_connection_destroy(&bc);
}
//ks_hash_write_lock(bh->sessions);
//for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
// void *key = NULL;
// blade_session_t *value = NULL;
// ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
// if (blade_session_state_get(value) == BLADE_SESSION_STATE_CLEANUP) ks_q_push(cleanup, value);
//}
//ks_hash_write_unlock(bh->sessions);
//while (ks_q_trypop(cleanup, (void **)&bs) == KS_STATUS_SUCCESS) {
// blade_handle_sessions_remove(bs);
// blade_session_destroy(&bs);
//}
ks_sleep_ms(500);
}
return NULL;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
* mode:c * mode:c
......
...@@ -36,10 +36,7 @@ ...@@ -36,10 +36,7 @@
#include <blade.h> #include <blade.h>
KS_BEGIN_EXTERN_C KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh);
blade_handle_t *bh,
void *transport_data,
blade_transport_callbacks_t *transport_callbacks);
KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP); KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction); KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
...@@ -50,9 +47,8 @@ KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bo ...@@ -50,9 +47,8 @@ KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bo
KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc);
KS_DECLARE(ks_status_t) blade_connection_write_lock(blade_connection_t *bc, ks_bool_t block); KS_DECLARE(ks_status_t) blade_connection_write_lock(blade_connection_t *bc, ks_bool_t block);
KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc);
KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc);
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data); KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data, blade_transport_callbacks_t *transport_callbacks);
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc); KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
......
...@@ -47,9 +47,9 @@ KS_DECLARE(ks_status_t) blade_response_destroy(blade_response_t **bresP); ...@@ -47,9 +47,9 @@ KS_DECLARE(ks_status_t) blade_response_destroy(blade_response_t **bresP);
KS_DECLARE(ks_status_t) blade_event_create(blade_event_t **bevP, blade_handle_t *bh, const char *session_id, cJSON *json); KS_DECLARE(ks_status_t) blade_event_create(blade_event_t **bevP, blade_handle_t *bh, const char *session_id, cJSON *json);
KS_DECLARE(ks_status_t) blade_event_destroy(blade_event_t **bevP); KS_DECLARE(ks_status_t) blade_event_destroy(blade_event_t **bevP);
KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method); KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *id); KS_DECLARE(ks_status_t) blade_rpc_response_create(cJSON **json, cJSON **result, const char *id);
KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJSON **error, const char *id, int32_t code, const char *message); KS_DECLARE(ks_status_t) blade_rpc_error_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message);
KS_DECLARE(ks_status_t) blade_rpc_event_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *event); KS_DECLARE(ks_status_t) blade_rpc_event_create(cJSON **json, cJSON **result, const char *event);
KS_END_EXTERN_C KS_END_EXTERN_C
#endif #endif
......
...@@ -171,7 +171,7 @@ KS_DECLARE(void) ks_sleep(ks_time_t microsec) ...@@ -171,7 +171,7 @@ KS_DECLARE(void) ks_sleep(ks_time_t microsec)
do { do {
QueryPerformanceCounter((LARGE_INTEGER*) &now); QueryPerformanceCounter((LARGE_INTEGER*) &now);
SwitchToThread(); if (!SwitchToThread()) Sleep(1);
} while ((now.QuadPart - start.QuadPart) / (float)(perfCnt.QuadPart) * 1000 * 1000 < (DWORD)microsec); } while ((now.QuadPart - start.QuadPart) / (float)(perfCnt.QuadPart) * 1000 * 1000 < (DWORD)microsec);
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论