提交 5db04d7d authored 作者: Chris Rienzo's avatar Chris Rienzo

FS-9965 [mod_hiredis] Improvements for performance, memory utilization, and resiliency.

   Pipelining of requests to improve throughput
     -- new parameter max-pipelined-requests (default 20) for maximum number of requests to batch at one time

   Deletion of counter keys when zero
     -- new parameter delete-when-zero (default false) to enable.  This will cause a redis eval to execute to decrement counters instead of DECR.

   Detection of negative limit counters
     -- Self healing of negative counters (due to key eviction, etc)
上级 73e08c17
...@@ -4,7 +4,7 @@ MODNAME=mod_hiredis ...@@ -4,7 +4,7 @@ MODNAME=mod_hiredis
if HAVE_HIREDIS if HAVE_HIREDIS
mod_LTLIBRARIES = mod_hiredis.la mod_LTLIBRARIES = mod_hiredis.la
mod_hiredis_la_SOURCES = mod_hiredis.c hiredis_utils.c hiredis_profile.c mod_hiredis_la_SOURCES = mod_hiredis.c hiredis_utils.c hiredis_profile.c hiredis_pipeline.c
mod_hiredis_la_CFLAGS = $(AM_CFLAGS) $(HIREDIS_CFLAGS) mod_hiredis_la_CFLAGS = $(AM_CFLAGS) $(HIREDIS_CFLAGS)
mod_hiredis_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_hiredis_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_hiredis_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(HIREDIS_LIBS) $(SWITCH_AM_LDFLAGS) mod_hiredis_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(HIREDIS_LIBS) $(SWITCH_AM_LDFLAGS)
......
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2016, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Christopher Rienzo <chris.rienzo@citrix.com>
*
* hiredis_pipeline.c -- batched operations to redis
*
*/
#include <mod_hiredis.h>
/**
* Thread that processes redis requests
* @param thread this thread
* @param obj the profile
* @return NULL
*/
static void *SWITCH_THREAD_FUNC pipeline_thread(switch_thread_t *thread, void *obj)
{
hiredis_profile_t *profile = (hiredis_profile_t *)obj;
switch_thread_rwlock_rdlock(profile->pipeline_lock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Redis pipeline thread started for [%s]\n", profile->name);
while ( profile->pipeline_running || switch_queue_size(profile->active_requests) > 0 ) {
void *val = NULL;
if ( switch_queue_pop_timeout(profile->active_requests, &val, 500 * 1000) == SWITCH_STATUS_SUCCESS && val ) {
int request_count = 1;
hiredis_request_t *requests = (hiredis_request_t *)val;
hiredis_request_t *cur_request = requests;
cur_request->next = NULL;
/* This would be easier to code in reverse order, but I prefer to execute requests in the order that they arrive */
while ( request_count < profile->max_pipelined_requests ) {
if ( switch_queue_trypop(profile->active_requests, &val) == SWITCH_STATUS_SUCCESS && val ) {
request_count++;
cur_request = cur_request->next = (hiredis_request_t *)val;
cur_request->next = NULL;
} else {
break;
}
}
hiredis_profile_execute_requests(profile, NULL, requests);
cur_request = requests;
while ( cur_request ) {
hiredis_request_t *next_request = cur_request->next; /* done here to prevent race with waiter */
if ( cur_request->response ) {
/* signal waiter */
switch_mutex_lock(cur_request->mutex);
cur_request->done = 1;
switch_thread_cond_signal(cur_request->cond);
switch_mutex_unlock(cur_request->mutex);
} else {
/* nobody to signal, clean it up */
switch_safe_free(cur_request->request);
switch_safe_free(cur_request->keys);
switch_safe_free(cur_request->session_uuid);
switch_queue_trypush(profile->request_pool, cur_request);
}
cur_request = next_request;
}
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Redis pipeline thread ended for [%s]\n", profile->name);
switch_thread_rwlock_unlock(profile->pipeline_lock);
return NULL;
}
/**
* Add a pipeline thread to the profile's thread pool
*/
void hiredis_pipeline_thread_start(hiredis_profile_t *profile)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
profile->pipeline_running = 1;
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, pipeline_thread, profile, profile->pool);
}
/**
* Wait for all pipeline threads to complete
*/
void hiredis_pipeline_threads_stop(hiredis_profile_t *profile)
{
if ( profile->pipeline_running ) {
profile->pipeline_running = 0;
switch_queue_interrupt_all(profile->active_requests);
switch_thread_rwlock_wrlock(profile->pipeline_lock);
}
}
/**
* Execute pipelined request and wait for response.
* @param profile to use
* @param session (optional)
* @param request - the request
* @return status SWITCH_STATUS_SUCCESS if successful
*/
static switch_status_t hiredis_profile_execute_pipeline_request(hiredis_profile_t *profile, switch_core_session_t *session, hiredis_request_t *request)
{
switch_status_t status;
/* send request to thread pool */
if ( profile->pipeline_running && switch_queue_trypush(profile->active_requests, request) == SWITCH_STATUS_SUCCESS ) {
if ( request->response ) {
/* wait for response */
switch_mutex_lock(request->mutex);
while ( !request->done ) {
switch_thread_cond_timedwait(request->cond, request->mutex, 1000 * 1000);
}
/* get response */
switch_mutex_unlock(request->mutex);
status = request->status;
/* save back to pool */
switch_queue_trypush(profile->request_pool, request);
} else {
status = SWITCH_STATUS_SUCCESS;
}
} else {
/* failed... do sync request instead */
status = hiredis_profile_execute_sync(profile, session, request->response, request->request);
if ( !request->response ) {
switch_safe_free(request->request);
switch_safe_free(request->keys);
switch_safe_free(request->session_uuid);
}
switch_queue_trypush(profile->request_pool, request);
}
return status;
}
/**
* Execute pipelined request and wait for response.
* @param profile to use
* @param session (optional)
* @param resp (optional) - if no resp, this function will not wait for the result
* @param request_string - the request
* @return status SWITCH_STATUS_SUCCESS if successful
*/
static switch_status_t hiredis_profile_execute_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *request_string)
{
void *val = NULL;
hiredis_request_t *request = NULL;
if (switch_queue_trypop(profile->request_pool, &val) == SWITCH_STATUS_SUCCESS && val) {
request = (hiredis_request_t *)val;
} else {
request = switch_core_alloc(profile->pool, sizeof(*request));
switch_thread_cond_create(&request->cond, profile->pool);
switch_mutex_init(&request->mutex, SWITCH_MUTEX_UNNESTED, profile->pool);
}
request->response = resp;
request->done = 0;
request->do_eval = 0;
request->num_keys = 0;
request->keys = NULL;
request->status = SWITCH_STATUS_SUCCESS;
request->next = NULL;
request->session_uuid = NULL;
if ( resp ) {
/* will block, no need to dup memory */
request->request = (char *)request_string;
if ( session ) {
request->session_uuid = switch_core_session_get_uuid(session);
}
} else {
/* fire and forget... need to dup memory */
request->request = strdup(request_string);
if ( session ) {
request->session_uuid = strdup(switch_core_session_get_uuid(session));
}
}
return hiredis_profile_execute_pipeline_request(profile, session, request);
}
/**
* Execute pipelined eval and wait for response.
* @param profile to use
* @param session (optional)
* @param resp (optional) - if no resp, this function will not wait for the result
* @param script
* @param num_keys
* @param keys
* @return status SWITCH_STATUS_SUCCESS if successful
*/
switch_status_t hiredis_profile_eval_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *script, int num_keys, const char *keys)
{
void *val = NULL;
hiredis_request_t *request = NULL;
if (switch_queue_trypop(profile->request_pool, &val) == SWITCH_STATUS_SUCCESS && val) {
request = (hiredis_request_t *)val;
} else {
request = switch_core_alloc(profile->pool, sizeof(*request));
switch_thread_cond_create(&request->cond, profile->pool);
switch_mutex_init(&request->mutex, SWITCH_MUTEX_UNNESTED, profile->pool);
}
request->response = resp;
request->done = 0;
request->do_eval = 1;
request->num_keys = num_keys;
request->status = SWITCH_STATUS_SUCCESS;
request->next = NULL;
request->session_uuid = NULL;
if ( resp ) {
/* will block, no need to dup memory */
request->request = (char *)script;
request->keys = (char *)keys;
if ( session ) {
request->session_uuid = switch_core_session_get_uuid(session);
}
} else {
/* fire and forget... need to dup memory */
request->request = strdup(script);
request->keys = strdup(keys);
if ( session ) {
request->session_uuid = strdup(switch_core_session_get_uuid(session));
}
}
return hiredis_profile_execute_pipeline_request(profile, session, request);
}
/**
* Execute pipelined request and wait for response.
* @param profile to use
* @param session (optional)
* @param resp (optional) - if no resp, this function will not wait for the result
* @param format_string - the request
* @return status SWITCH_STATUS_SUCCESS if successful
*/
switch_status_t hiredis_profile_execute_pipeline_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *format_string, ...)
{
switch_status_t result = SWITCH_STATUS_GENERR;
char *request = NULL;
va_list ap;
int ret;
va_start(ap, format_string);
ret = switch_vasprintf(&request, format_string, ap);
va_end(ap);
if ( ret != -1 ) {
result = hiredis_profile_execute_pipeline(profile, session, resp, request);
}
switch_safe_free(request);
return result;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/
...@@ -47,6 +47,8 @@ switch_status_t mod_hiredis_do_config() ...@@ -47,6 +47,8 @@ switch_status_t mod_hiredis_do_config()
hiredis_profile_t *new_profile = NULL; hiredis_profile_t *new_profile = NULL;
uint8_t ignore_connect_fail = 0; uint8_t ignore_connect_fail = 0;
uint8_t ignore_error = 0; uint8_t ignore_error = 0;
int max_pipelined_requests = 0;
int delete_when_zero = 0;
char *name = (char *) switch_xml_attr_soft(profile, "name"); char *name = (char *) switch_xml_attr_soft(profile, "name");
// Load params // Load params
...@@ -57,11 +59,19 @@ switch_status_t mod_hiredis_do_config() ...@@ -57,11 +59,19 @@ switch_status_t mod_hiredis_do_config()
ignore_connect_fail = switch_true(switch_xml_attr_soft(param, "value")); ignore_connect_fail = switch_true(switch_xml_attr_soft(param, "value"));
} else if ( !strncmp(var, "ignore-error", 12) ) { } else if ( !strncmp(var, "ignore-error", 12) ) {
ignore_error = switch_true(switch_xml_attr_soft(param, "value")); ignore_error = switch_true(switch_xml_attr_soft(param, "value"));
} else if ( !strncmp(var, "max-pipelined-requests", 22) ) {
max_pipelined_requests = atoi(switch_xml_attr_soft(param, "value"));
} else if ( !strncmp(var, "delete-when-zero", 16) ) {
delete_when_zero = switch_true(switch_xml_attr_soft(param, "value"));
} }
} }
} }
if ( hiredis_profile_create(&new_profile, name, ignore_connect_fail, ignore_error) == SWITCH_STATUS_SUCCESS ) { if (max_pipelined_requests <= 0) {
max_pipelined_requests = 20;
}
if ( hiredis_profile_create(&new_profile, name, ignore_connect_fail, ignore_error, max_pipelined_requests, delete_when_zero) == SWITCH_STATUS_SUCCESS ) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created profile[%s]\n", name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created profile[%s]\n", name);
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create profile[%s]\n", name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create profile[%s]\n", name);
......
...@@ -6,52 +6,85 @@ ...@@ -6,52 +6,85 @@
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
typedef struct mod_hiredis_global_s { typedef struct mod_hiredis_global_s {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_hash_t *profiles; switch_hash_t *profiles;
switch_mutex_t *limit_pvt_mutex;
} mod_hiredis_global_t; } mod_hiredis_global_t;
extern mod_hiredis_global_t mod_hiredis_globals; extern mod_hiredis_global_t mod_hiredis_globals;
typedef struct hiredis_request_s {
char *request;
char **response;
int done;
int do_eval;
int num_keys;
char *keys;
char *session_uuid;
switch_status_t status;
switch_mutex_t *mutex;
switch_thread_cond_t *cond;
struct hiredis_request_s *next;
} hiredis_request_t;
typedef struct mod_hiredis_context_s { typedef struct mod_hiredis_context_s {
struct hiredis_connection_s *connection; struct hiredis_connection_s *connection;
redisContext *context; redisContext *context;
} hiredis_context_t; } hiredis_context_t;
typedef struct hiredis_connection_s { typedef struct hiredis_connection_s {
char *host; char *host;
char *password; char *password;
uint32_t port; uint32_t port;
switch_interval_time_t timeout_us; switch_interval_time_t timeout_us;
struct timeval timeout; struct timeval timeout;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_queue_t *context_pool; switch_queue_t *context_pool;
struct hiredis_connection_s *next; struct hiredis_connection_s *next;
} hiredis_connection_t; } hiredis_connection_t;
typedef struct hiredis_profile_s { typedef struct hiredis_profile_s {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
char *name; char *name;
uint8_t ignore_connect_fail; uint8_t ignore_connect_fail;
uint8_t ignore_error; uint8_t ignore_error;
hiredis_connection_t *conn_head;
hiredis_connection_t *conn_head; switch_thread_rwlock_t *pipeline_lock;
switch_queue_t *request_pool;
switch_queue_t *active_requests;
int pipeline_running;
int max_pipelined_requests;
int delete_when_zero;
} hiredis_profile_t; } hiredis_profile_t;
typedef struct hiredis_limit_pvt_node_s {
char *realm;
char *resource;
char *limit_key;
int inc;
int interval;
struct hiredis_limit_pvt_node_s *next;
} hiredis_limit_pvt_node_t;
typedef struct hiredis_limit_pvt_s { typedef struct hiredis_limit_pvt_s {
char *realm; switch_mutex_t *mutex;
char *resource; struct hiredis_limit_pvt_node_s *first;
char *limit_key;
int inc;
int interval;
struct hiredis_limit_pvt_s *next;
} hiredis_limit_pvt_t; } hiredis_limit_pvt_t;
switch_status_t mod_hiredis_do_config(void); switch_status_t mod_hiredis_do_config(void);
switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error); switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error, int max_pipelined_requests, int delete_when_zero);
switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile); switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile);
switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_connections); switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_connections);
switch_status_t hiredis_profile_execute_requests(hiredis_profile_t *profile, switch_core_session_t *session, hiredis_request_t *requests);
switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data);
switch_status_t hiredis_profile_execute_sync_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data_format_string, ...);
switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **response, switch_core_session_t *session); void hiredis_pipeline_thread_start(hiredis_profile_t *profile);
void hiredis_pipeline_threads_stop(hiredis_profile_t *profile);
switch_status_t hiredis_profile_execute_pipeline_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data_format_string, ...);
switch_status_t hiredis_profile_eval_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *script, int num_keys, const char *keys);
#endif /* MOD_HIREDIS_H */ #endif /* MOD_HIREDIS_H */
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论