提交 1e929e72 authored 作者: Eliot Gable's avatar Eliot Gable

More fixes for reconnecting to PGSQL on connection failure.

上级 e8f3e42f
......@@ -43,7 +43,7 @@
struct switch_pgsql_handle {
char *dsn;
const char *sql;
char *sql;
PGconn* con;
int sock;
switch_pgsql_state_t state;
......@@ -96,6 +96,120 @@ SWITCH_DECLARE(switch_pgsql_handle_t *) switch_pgsql_handle_new(const char *dsn)
return NULL;
}
#ifdef SWITCH_HAVE_PGSQL
static int db_is_up(switch_pgsql_handle_t *handle)
{
int ret = 0;
switch_event_t *event;
char *err_str = NULL;
int max_tries = DEFAULT_PGSQL_RETRIES;
int code = 0, recon = 0;
if (handle) {
max_tries = handle->num_retries;
if (max_tries < 1)
max_tries = DEFAULT_PGSQL_RETRIES;
}
top:
if (!handle) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Handle\n");
goto done;
}
if (!handle->con) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Connection\n");
goto done;
}
/* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */
PQconsumeInput(handle->con);
if (PQstatus(handle->con) == CONNECTION_BAD) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "PQstatus returned bad connection; reconnecting...\n");
handle->state = SWITCH_PGSQL_STATE_ERROR;
PQreset(handle->con);
if (PQstatus(handle->con) == CONNECTION_BAD) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PQstatus returned bad connection -- reconnection failed!\n");
goto error;
}
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
}
/* if (!PQsendQuery(handle->con, "SELECT 1")) {
code = __LINE__;
goto error;
}
if(switch_pgsql_next_result(handle, &result) == SWITCH_PGSQL_FAIL) {
code = __LINE__;
goto error;
}
if (!result || result->status != PGRES_COMMAND_OK) {
code = __LINE__;
goto error;
}
switch_pgsql_free_result(&result);
switch_pgsql_finish_results(handle);
*/
ret = 1;
goto done;
error:
err_str = switch_pgsql_handle_get_error(handle);
if (PQstatus(handle->con) == CONNECTION_BAD) {
handle->state = SWITCH_PGSQL_STATE_ERROR;
PQreset(handle->con);
if (PQstatus(handle->con) == CONNECTION_OK) {
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
recon = SWITCH_PGSQL_SUCCESS;
}
}
max_tries--;
if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Failure-Message", "The sql server is not responding for DSN %s [%s][%d]",
switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The sql server is not responding for DSN %s [%s][%d]\n",
switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
if (recon == SWITCH_PGSQL_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection has been re-established");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "The connection has been re-established\n");
} else {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection could not be re-established");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The connection could not be re-established\n");
}
if (!max_tries) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "Giving up!");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up!\n");
}
switch_event_fire(&event);
}
if (!max_tries) {
goto done;
}
switch_safe_free(err_str);
switch_yield(1000000);
goto top;
done:
switch_safe_free(err_str);
return ret;
}
#endif
SWITCH_DECLARE(void) switch_pgsql_set_num_retries(switch_pgsql_handle_t *handle, int num_retries)
{
#ifdef SWITCH_HAVE_PGSQL
......@@ -117,7 +231,7 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_disconnect(switch_pgsq
PQfinish(handle->con);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Disconnected from [%s]\n", handle->dsn);
}
switch_safe_free(handle->sql);
handle->state = SWITCH_PGSQL_STATE_DOWN;
return SWITCH_PGSQL_SUCCESS;
......@@ -131,13 +245,14 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_send_query(switch_pgsql_handl
#ifdef SWITCH_HAVE_PGSQL
char *err_str;
switch_safe_free(handle->sql);
handle->sql = strdup(sql);
if (!PQsendQuery(handle->con, sql)) {
err_str = switch_pgsql_handle_get_error(handle);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to send query (%s) to database: %s\n", sql, err_str);
switch_pgsql_finish_results(handle);
goto error;
}
handle->sql = sql;
return SWITCH_PGSQL_SUCCESS;
error:
......@@ -159,12 +274,12 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_cancel_real(const char *file,
ret = SWITCH_PGSQL_FAIL;
}
PQfreeCancel(cancel);
/* Make sure the query is fully cancelled */
while (PQgetResult(handle->con) != NULL);
{
PGresult *tmp = NULL;
/* Make sure the query is fully cancelled */
while ((tmp = PQgetResult(handle->con)) != NULL) PQclear(tmp);
}
#endif
return ret;
}
......@@ -197,12 +312,29 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsq
fds[0].fd = handle->sock;
fds[0].events |= POLLIN;
fds[0].events |= POLLERR;
fds[0].events |= POLLNVAL;
fds[0].events |= POLLHUP;
fds[0].events |= POLLPRI;
fds[0].events |= POLLRDNORM;
fds[0].events |= POLLRDBAND;
/* Wait for the PostgreSQL socket to be ready for data reads. */
if ((poll_res = poll(&fds[0], 1, wait_time)) > -1 ) {
if (fds[0].revents & POLLIN) {
if ((poll_res = poll(&fds[0], 1, wait_time)) > 0 ) {
if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql);
goto error;
} else if (fds[0].revents & POLLERR) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
goto error;
} else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) {
/* Then try to consume any input waiting. */
if (PQconsumeInput(handle->con)) {
if (PQstatus(handle->con) == CONNECTION_BAD) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n");
handle->state = SWITCH_PGSQL_STATE_ERROR;
goto error;
}
/* And check to see if we have a full result ready for reading */
if (!PQisBusy(handle->con)) {
/* If we can pull a full result without blocking, then break this loop */
......@@ -216,11 +348,8 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsq
switch_pgsql_cancel(handle);
goto error;
}
} else if (fds[0].revents & POLLERR) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
goto error;
}
} else {
} else if (poll_res == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql);
goto error;
}
......@@ -233,8 +362,6 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsq
goto error;
}
}
} else {
/* If we had an error trying to consume input, report it and cancel the query. */
......@@ -298,6 +425,14 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsq
return SWITCH_PGSQL_SUCCESS;
error:
{
PGresult *tmp = NULL;
/* Make sure the failed connection does not have any transactions marked as in progress */
while ((tmp = PQgetResult(handle->con)) != NULL) PQclear(tmp);
/* Try to reconnect to the DB if we were dropped */
db_is_up(handle);
}
#endif
return SWITCH_PGSQL_FAIL;
}
......@@ -341,118 +476,6 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_finish_results_real(const cha
#endif
}
#ifdef SWITCH_HAVE_PGSQL
static int db_is_up(switch_pgsql_handle_t *handle)
{
int ret = 0;
switch_event_t *event;
char *err_str = NULL;
int max_tries = DEFAULT_PGSQL_RETRIES;
int code = 0, recon = 0;
if (handle) {
max_tries = handle->num_retries;
if (max_tries < 1)
max_tries = DEFAULT_PGSQL_RETRIES;
}
top:
if (!handle) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Handle\n");
goto done;
}
if (!handle->con) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Connection\n");
goto done;
}
/* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */
PQconsumeInput(handle->con);
if (PQstatus(handle->con) == CONNECTION_BAD) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "PQstatus returned bad connection; reconnecting...\n");
handle->state = SWITCH_PGSQL_STATE_ERROR;
PQreset(handle->con);
if (PQstatus(handle->con) == CONNECTION_BAD) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PQstatus returned bad connection -- reconnection failed!\n");
goto error;
}
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
}
/* if (!PQsendQuery(handle->con, "SELECT 1")) {
code = __LINE__;
goto error;
}
if(switch_pgsql_next_result(handle, &result) == SWITCH_PGSQL_FAIL) {
code = __LINE__;
goto error;
}
if (!result || result->status != PGRES_COMMAND_OK) {
code = __LINE__;
goto error;
}
switch_pgsql_free_result(&result);
switch_pgsql_finish_results(handle);
*/
ret = 1;
goto done;
error:
err_str = switch_pgsql_handle_get_error(handle);
if (PQstatus(handle->con) == CONNECTION_BAD) {
handle->state = SWITCH_PGSQL_STATE_ERROR;
PQreset(handle->con);
if (PQstatus(handle->con) == CONNECTION_OK) {
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
recon = SWITCH_PGSQL_SUCCESS;
}
}
max_tries--;
if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Failure-Message", "The sql server is not responding for DSN %s [%s][%d]",
switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The sql server is not responding for DSN %s [%s][%d]\n",
switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
if (recon == SWITCH_PGSQL_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection has been re-established");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "The connection has been re-established\n");
} else {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection could not be re-established");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The connection could not be re-established\n");
}
if (!max_tries) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "Giving up!");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up!\n");
}
switch_event_fire(&event);
}
if (!max_tries) {
goto done;
}
switch_safe_free(err_str);
switch_yield(1000000);
goto top;
done:
switch_safe_free(err_str);
return ret;
}
#endif
SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_connect(switch_pgsql_handle_t *handle)
{
......@@ -557,11 +580,14 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_exec_base_detailed(con
if (handle->auto_commit == SWITCH_FALSE && handle->in_txn == SWITCH_FALSE) {
if (switch_pgsql_send_query(handle, "BEGIN") != SWITCH_PGSQL_SUCCESS) {
er = strdup("Error sending BEGIN!");
switch_pgsql_finish_results(handle);
if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) {
db_is_up(handle); /* If finish_results failed, maybe the db went dead */
}
goto error;
}
if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) {
db_is_up(handle);
er = strdup("Error sending BEGIN!");
goto error;
}
......@@ -570,7 +596,9 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_exec_base_detailed(con
if (switch_pgsql_send_query(handle, sql) != SWITCH_PGSQL_SUCCESS) {
er = strdup("Error sending query!");
switch_pgsql_finish_results(handle);
if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) {
db_is_up(handle);
}
goto error;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论