提交 14fbb4b9 authored 作者: Anthony Minessale's avatar Anthony Minessale

add io mutex to cache db for optional mutex protection

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@15800 d0543943-73ff-0310-b7d9-9358b9ac24b2
上级 ceb68513
...@@ -1987,6 +1987,7 @@ typedef struct { ...@@ -1987,6 +1987,7 @@ typedef struct {
switch_cache_db_native_handle_t native_handle; switch_cache_db_native_handle_t native_handle;
time_t last_used; time_t last_used;
switch_mutex_t *mutex; switch_mutex_t *mutex;
switch_mutex_t *io_mutex;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
int32_t flags; int32_t flags;
unsigned long hash; unsigned long hash;
......
...@@ -86,7 +86,7 @@ SWITCH_DECLARE(const char *) switch_core_db_errmsg(switch_core_db_t *db) ...@@ -86,7 +86,7 @@ SWITCH_DECLARE(const char *) switch_core_db_errmsg(switch_core_db_t *db)
SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, switch_core_db_callback_func_t callback, void *data, char **errmsg) SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, switch_core_db_callback_func_t callback, void *data, char **errmsg)
{ {
int ret = 0; int ret = 0;
int sane = 100; int sane = 300;
char *err = NULL; char *err = NULL;
while (--sane > 0) { while (--sane > 0) {
......
...@@ -35,6 +35,19 @@ ...@@ -35,6 +35,19 @@
#include <switch.h> #include <switch.h>
#include "private/switch_core_pvt.h" #include "private/switch_core_pvt.h"
static struct {
switch_cache_db_handle_t *event_db;
switch_queue_t *sql_queue[2];
switch_memory_pool_t *memory_pool;
switch_event_node_t *event_node;
switch_thread_t *thread;
int thread_running;
switch_bool_t manage;
switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex;
switch_hash_t *dbh_hash;
} sql_manager;
#define SWITCH_CORE_DB "core" #define SWITCH_CORE_DB "core"
/*! /*!
...@@ -43,32 +56,26 @@ ...@@ -43,32 +56,26 @@
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{ {
switch_cache_db_connection_options_t options = { {0} }; switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) { if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) {
options.odbc_options.dsn = runtime.odbc_dsn; options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user; options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass; options.odbc_options.pass = runtime.odbc_pass;
return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
} else { } else {
options.core_db_options.db_path = SWITCH_CORE_DB; options.core_db_options.db_path = SWITCH_CORE_DB;
return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} }
}
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
}
static struct { return r;
switch_cache_db_handle_t *event_db; }
switch_queue_t *sql_queue[2];
switch_memory_pool_t *memory_pool;
switch_event_node_t *event_node;
switch_thread_t *thread;
int thread_running;
switch_bool_t manage;
} sql_manager;
static switch_mutex_t *dbh_mutex = NULL;
static switch_hash_t *dbh_hash = NULL;
#define SQL_CACHE_TIMEOUT 300 #define SQL_CACHE_TIMEOUT 300
...@@ -81,11 +88,11 @@ static void sql_close(time_t prune) ...@@ -81,11 +88,11 @@ static void sql_close(time_t prune)
int locked = 0; int locked = 0;
char *key; char *key;
switch_mutex_lock(dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
top: top:
locked = 0; locked = 0;
for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
key = (char *) var; key = (char *) var;
...@@ -117,7 +124,7 @@ static void sql_close(time_t prune) ...@@ -117,7 +124,7 @@ static void sql_close(time_t prune)
break; break;
} }
switch_core_hash_delete(dbh_hash, key); switch_core_hash_delete(sql_manager.dbh_hash, key);
switch_mutex_unlock(dbh->mutex); switch_mutex_unlock(dbh->mutex);
switch_core_destroy_memory_pool(&dbh->pool); switch_core_destroy_memory_pool(&dbh->pool);
goto top; goto top;
...@@ -133,7 +140,7 @@ static void sql_close(time_t prune) ...@@ -133,7 +140,7 @@ static void sql_close(time_t prune)
goto top; goto top;
} }
switch_mutex_unlock(dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
} }
...@@ -149,7 +156,7 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t ...@@ -149,7 +156,7 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t **dbh) SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t **dbh)
{ {
if (dbh && *dbh) { if (dbh && *dbh) {
switch_mutex_lock(dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting DB connection %s\n", (*dbh)->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting DB connection %s\n", (*dbh)->name);
switch ((*dbh)->type) { switch ((*dbh)->type) {
...@@ -167,11 +174,11 @@ SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t ...@@ -167,11 +174,11 @@ SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t
} }
switch_core_hash_delete(dbh_hash, (*dbh)->name); switch_core_hash_delete(sql_manager.dbh_hash, (*dbh)->name);
switch_mutex_unlock((*dbh)->mutex); switch_mutex_unlock((*dbh)->mutex);
switch_core_destroy_memory_pool(&(*dbh)->pool); switch_core_destroy_memory_pool(&(*dbh)->pool);
*dbh = NULL; *dbh = NULL;
switch_mutex_unlock(dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
} }
} }
...@@ -185,9 +192,9 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void) ...@@ -185,9 +192,9 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void)
switch_cache_db_handle_t *dbh = NULL; switch_cache_db_handle_t *dbh = NULL;
snprintf(thread_str, sizeof(thread_str) - 1, "%lu", (unsigned long)(intptr_t)switch_thread_self()); snprintf(thread_str, sizeof(thread_str) - 1, "%lu", (unsigned long)(intptr_t)switch_thread_self());
switch_mutex_lock(dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
key = (char *) var; key = (char *) var;
if ((dbh = (switch_cache_db_handle_t *) val)) { if ((dbh = (switch_cache_db_handle_t *) val)) {
...@@ -202,7 +209,7 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void) ...@@ -202,7 +209,7 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void)
} }
} }
switch_mutex_unlock(dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
} }
SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh, SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
...@@ -246,8 +253,8 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha ...@@ -246,8 +253,8 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
snprintf(thread_str, sizeof(thread_str) - 1, "%s;thread=\"%lu\"", db_str, (unsigned long)(intptr_t)self); snprintf(thread_str, sizeof(thread_str) - 1, "%s;thread=\"%lu\"", db_str, (unsigned long)(intptr_t)self);
snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line); snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
switch_mutex_lock(dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
if ((new_dbh = switch_core_hash_find(dbh_hash, thread_str))) { if ((new_dbh = switch_core_hash_find(sql_manager.dbh_hash, thread_str))) {
switch_set_string(new_dbh->last_user, db_callsite_str); switch_set_string(new_dbh->last_user, db_callsite_str);
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
"Reuse Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type)); "Reuse Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type));
...@@ -260,7 +267,7 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha ...@@ -260,7 +267,7 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
hash = switch_ci_hashfunc_default(db_str, &hlen); hash = switch_ci_hashfunc_default(db_str, &hlen);
for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
key = (char *) var; key = (char *) var;
...@@ -337,14 +344,14 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha ...@@ -337,14 +344,14 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
switch_set_string(new_dbh->creator, db_callsite_str); switch_set_string(new_dbh->creator, db_callsite_str);
switch_mutex_lock(new_dbh->mutex); switch_mutex_lock(new_dbh->mutex);
switch_core_hash_insert(dbh_hash, new_dbh->name, new_dbh); switch_core_hash_insert(sql_manager.dbh_hash, new_dbh->name, new_dbh);
} }
end: end:
if (new_dbh) new_dbh->last_used = switch_epoch_time_now(NULL); if (new_dbh) new_dbh->last_used = switch_epoch_time_now(NULL);
switch_mutex_unlock(dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
*dbh = new_dbh; *dbh = new_dbh;
...@@ -357,6 +364,10 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t ...@@ -357,6 +364,10 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
switch_status_t status = SWITCH_STATUS_FALSE; switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL; char *errmsg = NULL;
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
if (err) *err = NULL; if (err) *err = NULL;
switch (dbh->type) { switch (dbh->type) {
...@@ -387,6 +398,11 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t ...@@ -387,6 +398,11 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
} }
} }
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status; return status;
} }
...@@ -456,6 +472,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand ...@@ -456,6 +472,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
{ {
switch_status_t status = SWITCH_STATUS_FALSE; switch_status_t status = SWITCH_STATUS_FALSE;
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
switch (dbh->type) { switch (dbh->type) {
default: default:
...@@ -465,6 +484,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand ...@@ -465,6 +484,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
break; break;
} }
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status; return status;
} }
...@@ -475,6 +498,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t ...@@ -475,6 +498,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t
switch_status_t status = SWITCH_STATUS_FALSE; switch_status_t status = SWITCH_STATUS_FALSE;
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
switch (dbh->type) { switch (dbh->type) {
case SCDB_TYPE_CORE_DB: case SCDB_TYPE_CORE_DB:
{ {
...@@ -522,6 +549,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t ...@@ -522,6 +549,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t
end: end:
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status == SWITCH_STATUS_SUCCESS ? str : NULL; return status == SWITCH_STATUS_SUCCESS ? str : NULL;
} }
...@@ -537,6 +568,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_ ...@@ -537,6 +568,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_
retries = 1000; retries = 1000;
} }
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
while (retries > 0) { while (retries > 0) {
switch_cache_db_execute_sql_real(dbh, sql, &errmsg); switch_cache_db_execute_sql_real(dbh, sql, &errmsg);
if (errmsg) { if (errmsg) {
...@@ -554,6 +589,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_ ...@@ -554,6 +589,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_
} }
} }
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status; return status;
} }
...@@ -571,6 +610,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_ ...@@ -571,6 +610,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
retries = 1000; retries = 1000;
} }
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
again: again:
while (begin_retries > 0) { while (begin_retries > 0) {
...@@ -629,6 +672,11 @@ done: ...@@ -629,6 +672,11 @@ done:
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL); switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status; return status;
} }
...@@ -641,6 +689,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach ...@@ -641,6 +689,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
if (err) *err = NULL; if (err) *err = NULL;
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
switch (dbh->type) { switch (dbh->type) {
...@@ -661,6 +712,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach ...@@ -661,6 +712,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
break; break;
} }
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
return status; return status;
} }
...@@ -668,6 +723,11 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh ...@@ -668,6 +723,11 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh
{ {
char *errmsg; char *errmsg;
if (dbh->io_mutex) {
switch_mutex_lock(dbh->io_mutex);
}
switch (dbh->type) { switch (dbh->type) {
case SCDB_TYPE_ODBC: case SCDB_TYPE_ODBC:
{ {
...@@ -707,17 +767,22 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh ...@@ -707,17 +767,22 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh
} }
break; break;
} }
if (dbh->io_mutex) {
switch_mutex_unlock(dbh->io_mutex);
}
} }
#define SQLLEN 1024 * 64 #define SQLLEN 1024 * 1024
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj) static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj)
{ {
void *pop; void *pop;
uint32_t itterations = 0; uint32_t itterations = 0;
uint8_t trans = 0, nothing_in_queue = 0; uint8_t trans = 0, nothing_in_queue = 0;
uint32_t target = 50000; uint32_t target = 100000;
switch_size_t len = 0, sql_len = SQLLEN; switch_size_t len = 0, sql_len = SQLLEN;
char *tmp, *sqlbuf = (char *) malloc(sql_len); char *tmp, *sqlbuf = (char *) malloc(sql_len);
char *sql; char *sql;
...@@ -1175,8 +1240,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ ...@@ -1175,8 +1240,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
sql_manager.memory_pool = pool; sql_manager.memory_pool = pool;
sql_manager.manage = manage; sql_manager.manage = manage;
switch_mutex_init(&dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_core_hash_init(&dbh_hash, sql_manager.memory_pool); switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool);
top: top:
...@@ -1329,7 +1396,7 @@ void switch_core_sqldb_stop(void) ...@@ -1329,7 +1396,7 @@ void switch_core_sqldb_stop(void)
sql_close(0); sql_close(0);
switch_core_hash_destroy(&dbh_hash); switch_core_hash_destroy(&sql_manager.dbh_hash);
} }
...@@ -1347,9 +1414,9 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) ...@@ -1347,9 +1414,9 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
char *pos1 = NULL; char *pos1 = NULL;
char *pos2 = NULL; char *pos2 = NULL;
switch_mutex_lock(dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
key = (char *) var; key = (char *) var;
...@@ -1385,7 +1452,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) ...@@ -1385,7 +1452,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
dbh->last_user); dbh->last_user);
} }
} }
switch_mutex_unlock(dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
} }
/* For Emacs: /* For Emacs:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论