提交 94921247 authored 作者: Travis Cross's avatar Travis Cross

Merge in mod_fifo documentation and refactoring

In this branch we made a pass through the code commenting on and
documenting the most important bits, and cleaning up and refactoring
where possible while maintaining existing behavior.

Thanks to Anthony, Mike, and Brian for explaining some of the history
of and rationale for what's in tree.
/*
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
*
......@@ -22,8 +22,9 @@
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
*
* Anthony Minessale II <anthm@freeswitch.org>
* Travis Cross <tc@traviscross.com>
*
* mod_fifo.c -- FIFO
*
......@@ -35,6 +36,69 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown);
SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load);
SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
/*!\file
* # Theory of operation
*
* ## Kinds of things
*
* The fifo systems deals in various kinds of things: /fifos nodes/,
* /queues/, /(inbound) callers/, /outbound callers/, /consumers/, and
* /(outbound) members/.
*
* /fifo nodes/ accept callers and work to deliver those callers to
* consumers and members. The nodes contain an array of /queues/
* indexed by a priority value.
*
* /queues/ contain an array of callers treated as a list.
*
* /callers/ are the channels placed into a fifo node's queue for
* eventual delivery to consumers and members.
*
* /outbound callers/ are persons waiting to be called back via a
* dial string.
*
* /consumers/ are channels for agents who have dialed in to one or
* more fifos and will have callers connected to them.
*
* /members/ are agents who we'll place calls to via a dial string to
* attempt to deliver callers.
*
* An /agent/ may refer to either a /consumer/ or an /member/.
*
* ## Outbound Strategies
*
* An outbound strategy defines the way in which we attempt to deliver
* callers to members.
*
* The default strategy, /ringall/, preserves the caller ID of the
* caller being delivered. Because this means we must choose a caller
* before we place the call to the member, this impacts the order in
* which calls are delivered and the rate at which we can deliver
* those calls.
*
* The /enterprise/ outbound strategy does not preserve the caller ID
* of the caller thereby allowing deliver of callers to agents at the
* fastest possible rate.
*
* ## Manual calls
*
* The fifo system provides a way to prevent members on non-fifo calls
* from receiving a call from the fifo system. We do this by tracking
* non-fifo calls in a special fifo named `manual_calls`. When
* creating a channel for an agent we set the channel variable
* `fifo_outbound_uuid` to an arbitrary unique value for that agent,
* then call `fifo_track_call`. For the corresponding member we must
* also set `{fifo_outbound_uuid=}` to the same value.
*
* ## Importance
*
* Importance is a value 0-9 that can be associated with a fifo. The
* default value is 0. If the fifo is being dynamically created the
* importance of the fifo can be set when calling the `fifo`
* application. If the fifo already exists the importance value
* passed to the fifo application will be ignored.
*/
#define MANUAL_QUEUE_NAME "manual_calls"
#define FIFO_EVENT "fifo::info"
......@@ -47,10 +111,16 @@ typedef enum {
NODE_STRATEGY_ENTERPRISE
} outbound_strategy_t;
static outbound_strategy_t default_strategy = NODE_STRATEGY_RINGALL;
static int marker = 1;
/*!\struct fifo_queue_t
* \brief Queue of callers
*
* Callers are placed into a queue as events in `data` which is an
* array of such events. The array size is hard-coded as 1000
* elements.
*
* Fifo nodes are composed of an array of these queues representing
* each priority level of the fifo.
*/
typedef struct {
int nelm;
int idx;
......@@ -65,8 +135,6 @@ typedef enum {
FIFO_APP_DID_HOOK = (1 << 2)
} fifo_app_flag_t;
static int check_caller_outbound_call(const char *key);
static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
static void del_caller_outbound_call(const char *key);
......@@ -76,12 +144,10 @@ static void add_consumer_outbound_call(const char *key, switch_call_cause_t *can
static void del_consumer_outbound_call(const char *key);
static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
static int check_bridge_call(const char *key);
static void add_bridge_call(const char *key);
static void del_bridge_call(const char *key);
switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
{
fifo_queue_t *q;
......@@ -97,7 +163,6 @@ switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_
return SWITCH_STATUS_SUCCESS;
}
static void change_pos(switch_event_t *event, int pos)
{
const char *uuid = switch_event_get_header(event, "unique-id");
......@@ -106,37 +171,26 @@ static void change_pos(switch_event_t *event, int pos)
char tmp[30] = "";
if (zstr(uuid)) return;
if (!(session = switch_core_session_locate(uuid))) {
return;
}
channel = switch_core_session_get_channel(session);
switch_snprintf(tmp, sizeof(tmp), "%d", pos);
switch_channel_set_variable(channel, "fifo_position", tmp);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp);
switch_core_session_rwunlock(session);
}
static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
{
switch_mutex_lock(queue->mutex);
if (queue->idx == queue->nelm) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
queue->data[queue->idx++] = ptr;
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
static int fifo_queue_size(fifo_queue_t *queue)
......@@ -145,10 +199,15 @@ static int fifo_queue_size(fifo_queue_t *queue)
switch_mutex_lock(queue->mutex);
s = queue->idx;
switch_mutex_unlock(queue->mutex);
return s;
}
/*!
* \param remove Whether to remove the popped event from the queue
* If remove is 0, do not remove the popped event. If it is 1,
* remove it if it is not an event for an outbound caller. If it is
* 2, always remove it.
*/
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
{
int i, j;
......@@ -188,12 +247,19 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
}
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
/*!\brief Remove matching event from queue
*
* Each event in the queue will be checked to see whether it has a
* header equal to name whose value is equal to val. If it does, that
* event will be returned unless the event is for an outbound caller.
* If name starts with '+' or remove == 2 then forcing is enabled and
* the event will be returned in any case. If remove > 0 then the
* returned event will be removed from the queue and the remaining
* elements shifted to make them contiguous.
*/
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
{
int i, j, force = 0;
......@@ -218,7 +284,6 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
const char *j_val = switch_event_get_header(queue->data[j], name);
const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
if (remove) {
*pop = queue->data[j];
} else {
......@@ -248,6 +313,12 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
return SWITCH_STATUS_SUCCESS;
}
/*!\brief Destroy event with given uuid and remove it from queue
*
* Elements of the queue are searched until a matching uuid is found.
* That uuid is then destroyed and removed from the queue. The
* remaining elements are shifted to make them contiguous.
*/
static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
{
int i, j;
......@@ -283,11 +354,20 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
/*!\struct fifo_node
*
* \var fifo_node::outbound_name
* \brief Name of fifo in caller ID
*
* For the ringall strategy, this value is a prefix for the
* caller ID shown to agents. If the value starts with '=' then the
* actual caller ID is replaced completely.
*
* For the enterprise strategy, this value is used instead of the
* queue name in the caller ID.
*/
struct fifo_node {
char *name;
switch_mutex_t *mutex;
......@@ -322,8 +402,6 @@ typedef struct fifo_node fifo_node_t;
static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session);
static void fifo_caller_del(const char *uuid);
struct callback {
char *buf;
size_t len;
......@@ -331,7 +409,7 @@ struct callback {
};
typedef struct callback callback_t;
static const char *strat_parse(outbound_strategy_t s)
static const char *print_strategy(outbound_strategy_t s)
{
switch (s) {
case NODE_STRATEGY_RINGALL:
......@@ -345,7 +423,7 @@ static const char *strat_parse(outbound_strategy_t s)
return "invalid";
}
static outbound_strategy_t parse_strat(const char *name)
static outbound_strategy_t parse_strategy(const char *name)
{
if (!strcasecmp(name, "ringall")) {
return NODE_STRATEGY_RINGALL;
......@@ -367,16 +445,19 @@ static int sql2str_callback(void *pArg, int argc, char **argv, char **columnName
return 0;
}
static switch_bool_t match_key(const char *caller_exit_key, char key)
{
while (caller_exit_key && *caller_exit_key) {
if (*caller_exit_key++ == key) {
return SWITCH_TRUE;
}
}
return SWITCH_FALSE;
}
/*!\brief Handler for consumer DTMF
*
* When `fifo_consumer_exit_key` is pressed by the consumer we hangup
* on the caller (unless we've put the caller on hold). The default
* exit key is '*'.
*
* When the consumer presses '0' we put both legs on hold and play
* hold music as follows. To the caller we play `fifo_music` or the
* default hold music for the channel. To the consumer we play
* `fifo_hold_music`, or `fifo_music`, or the default hold music for
* the channel. The consumer can press '0' again to pick up the
* caller from hold.
*/
static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
{
switch_core_session_t *bleg = (switch_core_session_t *) buf;
......@@ -388,13 +469,10 @@ static switch_status_t on_dtmf(switch_core_session_t *session, void *input, swit
switch_channel_t *bchan = switch_core_session_get_channel(bleg);
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *consumer_exit_key = switch_channel_get_variable(channel, "fifo_consumer_exit_key");
if (switch_channel_test_flag(switch_core_session_get_channel(session), CF_BRIDGE_ORIGINATOR)) {
if (consumer_exit_key && dtmf->digit == *consumer_exit_key) {
switch_channel_hangup(bchan, SWITCH_CAUSE_NORMAL_CLEARING);
return SWITCH_STATUS_BREAK;
} else if (!consumer_exit_key && dtmf->digit == '*') {
const char *consumer_exit_key = switch_channel_get_variable(channel, "fifo_consumer_exit_key");
if (!consumer_exit_key) consumer_exit_key = "*";
if (dtmf->digit == *consumer_exit_key) {
switch_channel_hangup(bchan, SWITCH_CAUSE_NORMAL_CLEARING);
return SWITCH_STATUS_BREAK;
} else if (dtmf->digit == '0') {
......@@ -423,6 +501,13 @@ static switch_status_t on_dtmf(switch_core_session_t *session, void *input, swit
return SWITCH_STATUS_SUCCESS;
}
/*!\brief Handler for caller DTMF
*
* The channel variable `fifo_caller_exit_key` can be set to one or
* more digits that when pressed will cause the caller to exit from
* the fifo. We'll return via a single character in `buf` the digit
* that was pressed (not null-terminated).
*/
static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
{
switch (itype) {
......@@ -432,7 +517,7 @@ static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input,
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
if (match_key(caller_exit_key, dtmf->digit)) {
if (caller_exit_key && dtmf->digit && strchr(caller_exit_key, dtmf->digit)) {
char *bp = buf;
*bp = dtmf->digit;
return SWITCH_STATUS_BREAK;
......@@ -446,7 +531,9 @@ static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input,
return SWITCH_STATUS_SUCCESS;
}
#define check_string(s) if (!zstr(s) && !strcasecmp(s, "undef")) { s = NULL; }
static inline void cleanup_fifo_arg(const char **s) {
if (!zstr(*s) && !strcasecmp(*s, "undef")) *s = NULL;
}
static int node_caller_count(fifo_node_t *node)
{
......@@ -476,6 +563,12 @@ static void node_remove_uuid(fifo_node_t *node, const char *uuid)
return;
}
/*!\struct fifo_chime_data
*
* \var fifo_chime_data::list
* A list of strings representing things to play back to the caller
* while they are waiting to be connected with an agent.
*/
#define MAX_CHIME 25
struct fifo_chime_data {
char *list[MAX_CHIME];
......@@ -494,6 +587,11 @@ struct fifo_chime_data {
typedef struct fifo_chime_data fifo_chime_data_t;
/*!\brief Enforce the `fifo_orbit_timeout`
*
* If the caller has been waiting longer than the `fifo_orbit_timeout`
* we break out so the orbit can do something else with the call.
*/
static switch_status_t chime_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{
fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
......@@ -506,7 +604,11 @@ static switch_status_t chime_read_frame_callback(switch_core_session_t *session,
return SWITCH_STATUS_SUCCESS;
}
/*!\brief Handle chimes and timeouts for callers
*
* Play back the chimes in order spaced out by the given `freq` while
* ensuring that we don't exceed the `orbit_timeout`.
*/
static switch_status_t caller_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{
fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
......@@ -530,14 +632,14 @@ static switch_status_t caller_read_frame_callback(switch_core_session_t *session
args.buflen = sizeof(buf);
args.read_frame_callback = chime_read_frame_callback;
args.user_data = user_data;
status = switch_ivr_play_file(session, NULL, cd->list[cd->index], &args);
if (match_key(cd->exit_key, *buf)) {
if (cd->exit_key && *buf && strchr(cd->exit_key, *buf)) {
cd->abort = 1;
return SWITCH_STATUS_BREAK;
}
if (status != SWITCH_STATUS_SUCCESS) {
return SWITCH_STATUS_BREAK;
}
......@@ -550,18 +652,23 @@ static switch_status_t caller_read_frame_callback(switch_core_session_t *session
return chime_read_frame_callback(session, frame, user_data);
}
/*!\brief Handler for waiting consumers
*
* In `user_data` we'll be passed an array of fifo_nodes representing
* the fifos for which this consumer will accept calls. If any of
* those fifos have a caller in them, we break out so we can accept
* the call.
*/
static switch_status_t consumer_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{
fifo_node_t *node, **node_list = (fifo_node_t **) user_data;
int x = 0, total = 0, i = 0;
int total = 0, i = 0;
for (i = 0;; i++) {
if (!(node = node_list[i])) {
break;
}
for (x = 0; x < MAX_PRI; x++) {
total += fifo_queue_size(node->fifo_list[x]);
}
total += node_caller_count(node);
}
if (total) {
......@@ -571,8 +678,6 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
return SWITCH_STATUS_SUCCESS;
}
struct fifo_node;
static struct {
switch_hash_t *caller_orig_hash;
switch_hash_t *consumer_orig_hash;
......@@ -600,18 +705,17 @@ static struct {
char *pre_trans_execute;
char *post_trans_execute;
char *inner_pre_trans_execute;
char *inner_post_trans_execute;
char *inner_post_trans_execute;
switch_sql_queue_manager_t *qm;
int allow_transcoding;
switch_bool_t delete_all_members_on_startup;
outbound_strategy_t default_strategy;
} globals;
static int fifo_dec_use_count(const char *outbound_id)
{
int r = 0, *count;
switch_mutex_lock(globals.use_mutex);
if ((count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
if (*count > 0) {
......@@ -619,11 +723,11 @@ static int fifo_dec_use_count(const char *outbound_id)
}
}
switch_mutex_unlock(globals.use_mutex);
return r;
}
static int fifo_get_use_count(const char *outbound_id)
static int fifo_get_use_count(const char *outbound_id)
{
int r = 0, *count;
......@@ -632,12 +736,11 @@ static int fifo_get_use_count(const char *outbound_id)
r = *count;
}
switch_mutex_unlock(globals.use_mutex);
return r;
}
static int fifo_inc_use_count(const char *outbound_id)
static int fifo_inc_use_count(const char *outbound_id)
{
int r = 0, *count;
......@@ -650,11 +753,11 @@ static int fifo_inc_use_count(const char *outbound_id)
r = ++(*count);
switch_mutex_unlock(globals.use_mutex);
return r;
}
static void fifo_init_use_count(void)
static void fifo_init_use_count(void)
{
switch_mutex_lock(globals.use_mutex);
if (globals.use_hash) {
......@@ -664,9 +767,6 @@ static void fifo_init_use_count(void)
switch_mutex_unlock(globals.use_mutex);
}
static int check_caller_outbound_call(const char *key)
{
int x = 0;
......@@ -677,10 +777,8 @@ static int check_caller_outbound_call(const char *key)
x = !!switch_core_hash_find(globals.caller_orig_hash, key);
switch_mutex_unlock(globals.caller_orig_mutex);
return x;
}
static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
{
if (!key) return;
......@@ -712,8 +810,6 @@ static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cau
switch_mutex_unlock(globals.caller_orig_mutex);
}
static int check_bridge_call(const char *key)
{
int x = 0;
......@@ -724,12 +820,11 @@ static int check_bridge_call(const char *key)
x = !!switch_core_hash_find(globals.bridge_hash, key);
switch_mutex_unlock(globals.bridge_mutex);
return x;
}
static void add_bridge_call(const char *key)
{
static int marker = 1;
if (!key) return;
switch_mutex_lock(globals.bridge_mutex);
......@@ -744,7 +839,6 @@ static void del_bridge_call(const char *key)
switch_mutex_unlock(globals.bridge_mutex);
}
static int check_consumer_outbound_call(const char *key)
{
int x = 0;
......@@ -755,7 +849,6 @@ static int check_consumer_outbound_call(const char *key)
x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
switch_mutex_unlock(globals.consumer_orig_mutex);
return x;
}
static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
......@@ -787,17 +880,13 @@ static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t c
*cancel_cause = cause;
}
switch_mutex_unlock(globals.consumer_orig_mutex);
}
switch_cache_db_handle_t *fifo_get_db_handle(void)
{
switch_cache_db_handle_t *dbh = NULL;
char *dsn;
if (!zstr(globals.odbc_dsn)) {
dsn = globals.odbc_dsn;
} else {
......@@ -807,7 +896,7 @@ switch_cache_db_handle_t *fifo_get_db_handle(void)
if (switch_cache_db_get_db_handle_dsn(&dbh, dsn) != SWITCH_STATUS_SUCCESS) {
dbh = NULL;
}
return dbh;
}
......@@ -817,8 +906,7 @@ static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_al
char *sql;
switch_assert(sqlp && *sqlp);
sql = *sqlp;
sql = *sqlp;
if (switch_stristr("insert", sql)) {
index = 0;
......@@ -835,7 +923,6 @@ static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_al
}
return SWITCH_STATUS_SUCCESS;
}
#if 0
static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
......@@ -911,23 +998,19 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
char outbound_count[80] = "";
callback_t cbt = { 0 };
char *sql = NULL;
char *domain_name = NULL;
if (!globals.running) {
return NULL;
}
switch_core_new_memory_pool(&pool);
node = switch_core_alloc(pool, sizeof(*node));
node->pool = pool;
node->outbound_strategy = default_strategy;
node->outbound_strategy = globals.default_strategy;
node->name = switch_core_strdup(node->pool, name);
if (!strchr(name, '@')) {
domain_name = switch_core_get_domain(SWITCH_TRUE);
node->domain_name = switch_core_strdup(node->pool, domain_name);
node->domain_name = switch_core_strdup(node->pool, switch_core_get_domain(SWITCH_FALSE));
}
for (x = 0; x < MAX_PRI; x++) {
......@@ -943,12 +1026,8 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
node->member_count = atoi(outbound_count);
if (node->member_count > 0) {
node->has_outbound = 1;
} else {
node->has_outbound = 0;
}
node->member_count = atoi(outbound_count);
node->has_outbound = (node->member_count > 0) ? 1 : 0;
switch_safe_free(sql);
node->importance = importance;
......@@ -960,8 +1039,6 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
globals.nodes = node;
switch_mutex_unlock(globals.mutex);
switch_safe_free(domain_name);
return node;
}
......@@ -986,7 +1063,6 @@ static int node_idle_consumers(fifo_node_t *node)
switch_mutex_unlock(node->mutex);
return total;
}
struct call_helper {
......@@ -1006,6 +1082,8 @@ struct callback_helper {
int ready;
};
/*!\brief Handle unbridging of manually tracked calls
*/
static void do_unbridge(switch_core_session_t *consumer_session, switch_core_session_t *caller_session)
{
switch_channel_t *consumer_channel = switch_core_session_get_channel(consumer_session);
......@@ -1033,15 +1111,13 @@ static void do_unbridge(switch_core_session_t *consumer_session, switch_core_ses
if ((outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid"))) {
use_count = fifo_get_use_count(outbound_id);
}
ts = switch_micro_time_now();
switch_time_exp_lt(&tm, ts);
switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
switch_channel_set_variable(consumer_channel, "fifo_timestamp", date);
......@@ -1093,7 +1169,8 @@ static void do_unbridge(switch_core_session_t *consumer_session, switch_core_ses
}
}
/*!\brief Handle session messages for manually tracked calls
*/
static switch_status_t messagehook (switch_core_session_t *session, switch_core_session_message_t *msg)
{
switch_event_t *event;
......@@ -1112,6 +1189,15 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
case SWITCH_MESSAGE_INDICATE_BRIDGE:
case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
if (msg->numeric_arg == 42) {
/* See audio_bridge_thread() for source of 42 constant. */
/* When a session is interrupted to execute an application
(e.g. by uuid_execute) we need to tell everything in FS
to unbridge the channel (e.g. to turn on the
jitterbuffer) but we need mod_fifo not to see the
unbridge because we don't want fifo to stop tracking
the call. So this magic number is a complete hack to
make this happen. So we ignore it here and simply fall
through. */
goto end;
}
if ((caller_session = switch_core_session_locate(msg->string_arg))) {
......@@ -1135,11 +1221,9 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
goto end;
}
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_BRIDGE:
{
long epoch_start = 0;
char date[80] = "";
switch_time_t ts;
......@@ -1234,7 +1318,6 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
epoch_start = (long)switch_epoch_time_now(NULL);
ts = switch_micro_time_now();
......@@ -1276,8 +1359,21 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
return SWITCH_STATUS_SUCCESS;
}
static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
/*!\brief Create calls to outbound members with ringall strategy
*
* A fifo node has been selected for us and we've been given a list of
* outbound members to ring. We're going to pick a single caller by
* searching through the fifo node queues in order of priority
* (`fifo_priority`) from lowest to highest. We'll look first for
* callers with fifo_vip=true. Finding none, we'll consider the
* plebs.
*
* Once we have a caller to service, we'll set fifo_bridge_uuid for
* that caller to let the fifo application in on our decision. Our
* job being done, we'll let the fifo application deal with the
* remaining details.
*/
static void *SWITCH_THREAD_FUNC outbound_ringall_thread_run(switch_thread_t *thread, void *obj)
{
struct callback_helper *cbh = (struct callback_helper *) obj;
char *node_name;
......@@ -1360,7 +1456,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
goto end;
}
if (node) {
switch_mutex_lock(node->update_mutex);
node->busy = 0;
......@@ -1376,14 +1471,12 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
switch_assert(ovars);
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *parsed = NULL;
int use_ent = 0;
char *expanded_originate_string = switch_event_expand_headers(ovars, h->originate_string);
if (strstr(expanded_originate_string, "user/")) {
switch_event_create_brackets(expanded_originate_string, '<', '>', ',', &ovars, &parsed, SWITCH_TRUE);
use_ent = 1;
......@@ -1398,7 +1491,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
if (use_ent) {
stream.write_function(&stream, "{ignore_early_media=true,outbound_redirect_fatal=true,leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s}%s%s",
h->timeout, h->uuid, node->name,
h->timeout, h->uuid, node->name,
parsed ? parsed : expanded_originate_string, (i == cbh->rowcount - 1) ? "" : SWITCH_ENT_ORIGINATE_DELIM);
} else {
stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s]%s,",
......@@ -1411,7 +1504,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
if (expanded_originate_string && expanded_originate_string != h->originate_string) {
switch_safe_free(expanded_originate_string);
}
}
originate_string = (char *) stream.data;
......@@ -1474,7 +1566,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_originate_uuid", uuid_str);
if ((export = switch_event_get_header(pop, "variable_fifo_export"))) {
int argc;
char *argv[100] = { 0 };
......@@ -1496,7 +1587,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_safe_free(mydata);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_core_session_t *session;
if (id && (session = switch_core_session_locate(id))) {
......@@ -1522,12 +1612,9 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
if (!total) goto end;
if (!globals.allow_transcoding && !switch_true(switch_event_get_header(pop, "variable_fifo_allow_transcoding")) &&
if (!globals.allow_transcoding && !switch_true(switch_event_get_header(pop, "variable_fifo_allow_transcoding")) &&
(codec = switch_event_get_header(pop, "variable_rtp_use_codec_name"))) {
const char *rate = switch_event_get_header(pop, "variable_rtp_use_codec_rate");
const char *ptime = switch_event_get_header(pop, "variable_rtp_use_codec_ptime");
......@@ -1550,7 +1637,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
del_caller_outbound_call(id);
if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
const char *acceptable = "false";
......@@ -1566,7 +1652,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
"where uuid='%q' and ring_count > 0", h->uuid);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
}
break;
default:
......@@ -1579,7 +1664,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
"next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
(long) switch_epoch_time_now(NULL) + node->retry_delay, h->uuid);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
}
}
......@@ -1615,7 +1699,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_event_fire(&event);
}
switch_channel_set_variable(channel, "fifo_pop_order", NULL);
app_name = "fifo";
......@@ -1629,9 +1712,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_core_session_rwunlock(session);
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
......@@ -1650,7 +1730,6 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_thread_rwlock_unlock(node->rwlock);
}
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
del_consumer_outbound_call(h->uuid);
......@@ -1679,7 +1758,16 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
return NULL;
}
static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
/*!\brief Send a call to an outbound member with the enterprise strategy
*
* A fifo and an outbound member have been picked out for us and our
* job is to create a channel to the member and deliver that channel
* into the `fifo <fifo> out` application.
*
* We haven't picked a caller yet, and we won't do so here. We'll let
* the fifo application take care of that work.
*/
static void *SWITCH_THREAD_FUNC outbound_enterprise_thread_run(switch_thread_t *thread, void *obj)
{
struct call_helper *h = (struct call_helper *) obj;
......@@ -1702,7 +1790,6 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
globals.threads++;
switch_mutex_unlock(globals.mutex);
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, h->node_name);
switch_thread_rwlock_rdlock(node->rwlock);
......@@ -1734,7 +1821,6 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
"origination_caller_id_name=Queue,origination_caller_id_number='Queue: %q'}%s",
node->name, node->name, node->name, expanded_originate_string);
}
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
......@@ -1746,14 +1832,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_event_fire(&event);
}
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
if (status != SWITCH_STATUS_SUCCESS) {
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
"outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
(long) switch_epoch_time_now(NULL) + (node ? node->retry_delay : 0), h->uuid);
......@@ -1786,7 +1870,6 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_event_fire(&event);
}
if ((member_wait = switch_channel_get_variable(channel, "fifo_member_wait")) || (member_wait = switch_channel_get_variable(channel, "member_wait"))) {
if (strcasecmp(member_wait, "wait") && strcasecmp(member_wait, "nowait")) {
member_wait = NULL;
......@@ -1834,6 +1917,9 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
return NULL;
}
/*!\brief Extract the outbound member results and accumulate them for
* the ringall strategy handler
*/
static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames)
{
struct callback_helper *cbh = (struct callback_helper *) pArg;
......@@ -1856,12 +1942,13 @@ static int place_call_ringall_callback(void *pArg, int argc, char **argv, char *
}
return 0;
}
/*!\brief Extract the outbound member results and invoke the
* enterprise strategy handler
*/
static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames)
{
int *need = (int *) pArg;
switch_thread_t *thread;
......@@ -1877,22 +1964,32 @@ static int place_call_enterprise_callback(void *pArg, int argc, char **argv, cha
h->originate_string = switch_core_strdup(h->pool, argv[2]);
h->timeout = atoi(argv[5]);
switch_threadattr_create(&thd_attr, h->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, o_thread_run, h, h->pool);
switch_thread_create(&thread, thd_attr, outbound_enterprise_thread_run, h, h->pool);
(*need)--;
return *need ? 0 : -1;
}
/*!\brief Find outbound members to call for a given fifo node
*
* We're given a fifo node that has callers to be delivered to agents.
* Our job is to find available outbound members and pass them to the
* appropriate outbound strategy handler.
*
* The ringall strategy handler needs the full list of members to do
* its job, so we first let `place_call_ringall_callback` accumulate
* the results. The enterprise strategy handler can simply take each
* member one at a time, so the `place_call_enterprise_callback` takes
* care of invoking the handler.
*/
static void find_consumers(fifo_node_t *node)
{
char *sql;
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
"from fifo_outbound "
......@@ -1901,8 +1998,6 @@ static void find_consumers(fifo_node_t *node)
node->name, (long) switch_epoch_time_now(NULL)
);
switch(node->outbound_strategy) {
case NODE_STRATEGY_ENTERPRISE:
{
......@@ -1913,7 +2008,6 @@ static void find_consumers(fifo_node_t *node)
}
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
}
break;
case NODE_STRATEGY_RINGALL:
......@@ -1938,21 +2032,33 @@ static void find_consumers(fifo_node_t *node)
switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
switch_thread_create(&thread, thd_attr, outbound_ringall_thread_run, cbh, cbh->pool);
} else {
switch_core_destroy_memory_pool(&pool);
}
}
break;
default:
break;
}
switch_safe_free(sql);
}
/*\brief Continuously attempt to deliver calls to outbound members
*
* For each outbound priority level 1-10, find fifo nodes with a
* matching priority. For each of those nodes with outbound members,
* run `find_consumers()` if the fifo node has calls needing to be
* delivered and not enough ready and waiting inbound consumers.
*
* In the event of nothing needing to be done, each cycle starts at
* priority 1 and ends at priority 10, yielding for one second
* afterward. We also yield after initiating outbound calls, starting
* again where we left off on the next node.
*
* We also take care of cleaning up after nodes queued for deletion.
*/
static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
{
fifo_node_t *node, *last, *this_node;
......@@ -1985,10 +2091,8 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
switch_event_destroy(&pop);
}
}
}
if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
......@@ -2027,7 +2131,6 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
}
if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
found++;
find_consumers(this_node);
......@@ -2035,7 +2138,6 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
}
}
}
if (++cur_priority > 10) {
cur_priority = 1;
......@@ -2087,7 +2189,6 @@ static void check_cancel(fifo_node_t *node)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Outbound call count (%d) exceeds required value for queue %s (%d), "
"Ending extraneous calls\n", node->ring_consumer_count, node->name, ppl_waiting);
switch_core_session_hupall_matching_var("fifo_hangup_check", node->name, SWITCH_CAUSE_ORIGINATOR_CANCEL);
}
}
......@@ -2156,7 +2257,6 @@ static void pres_event_handler(switch_event_t *event)
dup_node_name = switch_mprintf("%q@%q", node_name, domain_name);
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, node_name)) && !(node = switch_core_hash_find(globals.fifo_hash, dup_node_name))) {
node = create_node(node_name, 0, globals.sql_mutex);
......@@ -2207,7 +2307,6 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
switch_thread_rwlock_unlock(node->rwlock);
return i;
}
SWITCH_STANDARD_API(fifo_check_bridge_function)
......@@ -2242,17 +2341,14 @@ SWITCH_STANDARD_API(fifo_add_outbound_function)
stream->write_function(stream, "%d", fifo_add_outbound(argv[0], argv[1], priority));
free(data);
return SWITCH_STATUS_SUCCESS;
fail:
free(data);
stream->write_function(stream, "0");
return SWITCH_STATUS_SUCCESS;
}
static void dec_use_count(switch_core_session_t *session, const char *type)
......@@ -2266,7 +2362,6 @@ static void dec_use_count(switch_core_session_t *session, const char *type)
if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s untracking call on uuid %s!\n", switch_channel_get_name(channel), outbound_id);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
......@@ -2353,7 +2448,6 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s tracking call on uuid %s!\n", switch_channel_get_name(channel), data);
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
col1 = "manual_calls_in_count";
col2 = "manual_calls_in_total_count";
......@@ -2388,7 +2482,6 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
}
}
static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session)
{
char *sql;
......@@ -2416,11 +2509,8 @@ static void fifo_caller_del(const char *uuid)
}
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
typedef enum {
STRAT_MORE_PPL,
STRAT_WAITING_LONGER,
......@@ -2450,7 +2540,7 @@ SWITCH_STANDARD_APP(fifo_function)
const char *arg_fifo_name = NULL;
const char *arg_inout = NULL;
const char *serviced_uuid = NULL;
if (!globals.running) {
return;
}
......@@ -2506,7 +2596,6 @@ SWITCH_STANDARD_APP(fifo_function)
}
}
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
node = create_node(nlist[i], importance, globals.sql_mutex);
node->ready = 1;
......@@ -2543,8 +2632,8 @@ SWITCH_STANDARD_APP(fifo_function)
moh = NULL;
}
check_string(announce);
check_string(moh);
cleanup_fifo_arg(&announce);
cleanup_fifo_arg(&moh);
switch_assert(node);
switch_core_media_bug_pause(session);
......@@ -2620,7 +2709,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
switch_channel_event_set_data(channel, call_event);
fifo_queue_push(node->fifo_list[p], call_event);
fifo_caller_add(node, session);
in_table = 1;
......@@ -2691,12 +2779,11 @@ SWITCH_STANDARD_APP(fifo_function)
goto abort;
}
if (match_key(caller_exit_key, *buf)) {
if (caller_exit_key && *buf && strchr(caller_exit_key, *buf)) {
switch_channel_set_variable(channel, "fifo_caller_exit_key", (char *)buf);
aborted = 1;
goto abort;
}
}
if (!serviced_uuid && switch_channel_ready(channel)) {
......@@ -2747,7 +2834,6 @@ SWITCH_STANDARD_APP(fifo_function)
send_presence(node);
check_cancel(node);
switch_mutex_unlock(globals.mutex);
}
if ((switch_true(switch_channel_get_variable(channel, "fifo_caller_exit_to_orbit")) || cd.do_orbit) && cd.orbit_exten) {
......@@ -2763,7 +2849,6 @@ SWITCH_STANDARD_APP(fifo_function)
cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
goto done;
} else { /* consumer */
switch_event_t *pop = NULL;
switch_frame_t *read_frame;
......@@ -2905,13 +2990,39 @@ SWITCH_STANDARD_APP(fifo_function)
}
}
/* Before we can pick a caller we have to decide on a fifo
node to service if the consumer can service more than
one.
If all fifos have an importance of zero, we'll find the
first node that wins based on the chosen strategy.
The `waiting_longer` strategy will choose the node that
hasn't been empty for the longest time.
The `more_ppl` strategy will choose the node that has
the most people waiting.
If a node has an importance value set, it will cause us
to ignore later nodes with equivalent or lower
importance values. This means that a node with the
same importance that would otherwise win based on the
strategy will not be considered at all if it comes
later in the list. Note also that the high importance
node may still lose if a considered fifo earlier in the
list beats it per the strategy.
Note that when the consumer has been delivered by an
outbound strategy there will only be one fifo node
passed to us, so neither the importance nor the
strategy here will have any effect.
*/
for (i = 0; i < node_count; i++) {
if (!(node = node_list[i])) {
continue;
}
if ((waiting = node_caller_count(node))) {
if (!importance || node->importance > importance) {
if (strat == STRAT_WAITING_LONGER) {
if (node->start_waiting < longest) {
......@@ -2943,6 +3054,7 @@ SWITCH_STANDARD_APP(fifo_function)
check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
/* Handle predestined calls, including calls from the ringall strategy */
if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
if (check_bridge_call(varval) && switch_true(check)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
......@@ -3049,7 +3161,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Outbound-URL", url);
switch_event_fire(&event);
}
} else {
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(channel, event);
......@@ -3062,7 +3173,6 @@ SWITCH_STANDARD_APP(fifo_function)
url = NULL;
caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session));
}
} else {
if ((other_session = switch_core_session_locate(caller_uuid))) {
switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
......@@ -3103,7 +3213,6 @@ SWITCH_STANDARD_APP(fifo_function)
}
}
switch_channel_set_variable(other_channel, "fifo_serviced_by", my_id);
switch_channel_set_variable(other_channel, "fifo_serviced_uuid", switch_core_session_get_uuid(session));
switch_channel_set_flag(other_channel, CF_BREAK);
......@@ -3119,18 +3228,17 @@ SWITCH_STANDARD_APP(fifo_function)
if (!(switch_channel_ready(channel))) {
const char *app = switch_channel_get_variable(other_channel, "current_application");
const char *arg = switch_channel_get_variable(other_channel, "current_application_data");
switch_caller_extension_t *extension = NULL;
switch_caller_extension_t *extension = NULL;
switch_channel_set_variable_printf(channel, "last_sent_callee_id_name", "%s (AGENT FAIL)",
switch_channel_set_variable_printf(channel, "last_sent_callee_id_name", "%s (AGENT FAIL)",
switch_channel_get_variable(other_channel, "caller_id_name"));
switch_channel_set_variable(channel, "last_sent_callee_id_number", switch_channel_get_variable(other_channel, "caller_id_number"));
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING,
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING,
"Customer %s %s [%s] appears to be abandoned by agent %s [%s] "
"but is still on the line, redirecting them back to the queue with VIP status.\n",
switch_channel_get_name(other_channel),
switch_channel_get_variable(other_channel, "caller_id_name"),
switch_channel_get_name(other_channel),
switch_channel_get_variable(other_channel, "caller_id_name"),
switch_channel_get_variable(other_channel, "caller_id_number"),
switch_channel_get_variable(channel, "caller_id_name"),
switch_channel_get_variable(channel, "caller_id_number"));
......@@ -3139,7 +3247,7 @@ SWITCH_STANDARD_APP(fifo_function)
send_presence(node);
check_cancel(node);
if (app) {
extension = switch_caller_extension_new(other_session, app, arg);
switch_caller_extension_add_application(other_session, extension, app, arg);
......@@ -3162,30 +3270,24 @@ SWITCH_STANDARD_APP(fifo_function)
}
}
switch_channel_step_caller_profile(channel);
switch_channel_step_caller_profile(other_channel);
originator_cp = switch_channel_get_caller_profile(channel);
originatee_cp = switch_channel_get_caller_profile(other_channel);
switch_channel_set_originator_caller_profile(other_channel, switch_caller_profile_clone(other_session, originator_cp));
switch_channel_set_originatee_caller_profile(channel, switch_caller_profile_clone(session, originatee_cp));
originator_cp->callee_id_name = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_name);
originator_cp->callee_id_number = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_number);
originatee_cp->callee_id_name = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_name);
originatee_cp->callee_id_number = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_number);
originatee_cp->caller_id_name = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_name);
originatee_cp->caller_id_number = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_number);
ts = switch_micro_time_now();
switch_time_exp_lt(&tm, ts);
epoch_start = (long)switch_epoch_time_now(NULL);
......@@ -3215,7 +3317,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_process_import(session, other_channel, "fifo_caller_consumer_import", switch_channel_get_variable(channel, "fifo_import_prefix"));
switch_process_import(other_session, channel, "fifo_consumer_caller_import", switch_channel_get_variable(other_channel, "fifo_import_prefix"));
if (outbound_id) {
cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
add_bridge_call(outbound_id);
......@@ -3223,10 +3324,8 @@ SWITCH_STANDARD_APP(fifo_function)
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'",
switch_epoch_time_now(NULL), outbound_id);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
fifo_inc_use_count(outbound_id);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
......@@ -3243,8 +3342,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_fire(&event);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(channel, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
......@@ -3263,7 +3360,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_fire(&event);
}
add_bridge_call(switch_core_session_get_uuid(other_session));
add_bridge_call(switch_core_session_get_uuid(session));
......@@ -3279,10 +3375,8 @@ SWITCH_STANDARD_APP(fifo_function)
(long) switch_epoch_time_now(NULL)
);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
switch_channel_set_variable(other_channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(session));
......@@ -3297,7 +3391,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
}
if (switch_channel_test_flag(channel, CF_TRANSFER) && switch_channel_up(channel)) {
switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
......@@ -3315,10 +3409,8 @@ SWITCH_STANDARD_APP(fifo_function)
del_bridge_call(outbound_id);
fifo_dec_use_count(outbound_id);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
uint64_t hold_usec = 0, tt_usec = 0;
switch_channel_event_set_data(channel, event);
......@@ -3340,14 +3432,13 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
switch_event_fire(&event);
}
del_bridge_call(switch_core_session_get_uuid(session));
del_bridge_call(switch_core_session_get_uuid(other_session));
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(channel, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
......@@ -3355,7 +3446,7 @@ SWITCH_STANDARD_APP(fifo_function)
if (outbound_id) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
}
}
switch_event_fire(&event);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
......@@ -3385,7 +3476,6 @@ SWITCH_STANDARD_APP(fifo_function)
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
if (switch_channel_ready(channel)) {
switch_core_media_bug_pause(session);
}
......@@ -3411,7 +3501,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_core_session_rwunlock(other_session);
if (!do_wait || !switch_channel_ready(channel)) {
break;
}
......@@ -3469,7 +3558,6 @@ SWITCH_STANDARD_APP(fifo_function)
if ((terminator == *fifo_consumer_wrapup_key) || !(switch_channel_ready(channel))) {
break;
}
}
} else if (fifo_consumer_wrapup_time && (zstr(fifo_consumer_wrapup_key) || !strcmp(buf, fifo_consumer_wrapup_key))) {
while (switch_channel_ready(channel)) {
......@@ -3516,7 +3604,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s is still alive, tracking call.\n", switch_channel_get_name(channel));
fifo_track_call_function(session, outbound_id);
}
}
done:
......@@ -3535,7 +3622,7 @@ SWITCH_STANDARD_APP(fifo_function)
continue;
}
switch_thread_rwlock_unlock(node->rwlock);
if (node->ready == 1 && do_destroy && node_caller_count(node) == 0 && node->consumer_count == 0) {
switch_core_hash_delete(globals.fifo_hash, node->name);
node->ready = 0;
......@@ -3589,14 +3676,12 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
}
}
if (atoi(argv[13])) {
arg = 17;
} else {
arg = 18;
}
if ((etime = atol(argv[arg]))) {
switch_size_t retsize;
switch_time_exp_lt(&tm, switch_time_from_sec(etime));
......@@ -3605,7 +3690,6 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
switch_set_string(atime, "now");
}
x_out = switch_xml_add_child_d(h->xml, h->tag, c_off++);
switch_xml_set_attr_d(x_out, "simo", argv[3]);
switch_xml_set_attr_d(x_out, "use_count", argv[4]);
......@@ -3654,7 +3738,6 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
switch_xml_set_attr_d(x_out, "stop-time", tb);
}
switch_xml_set_attr_d(x_out, "next-available", expires);
switch_xml_set_txt_d(x_out, argv[2]);
......@@ -3668,7 +3751,6 @@ static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, ch
char *sql;
if (!strcmp(node->name, MANUAL_QUEUE_NAME)) {
sql = switch_mprintf("select uuid, '%s', originate_string, simo_count, use_count, timeout,"
"lag, next_avail, expires, static, outbound_call_count, outbound_fail_count,"
"hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
......@@ -3679,8 +3761,6 @@ static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, ch
"hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
"manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count",
MANUAL_QUEUE_NAME);
} else {
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
"lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, "
......@@ -3707,7 +3787,6 @@ static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, ch
return h.cc_off;
}
static int xml_bridge_callback(void *pArg, int argc, char **argv, char **columnNames)
{
struct xml_helper *h = (struct xml_helper *) pArg;
......@@ -3729,7 +3808,6 @@ static int xml_bridge_callback(void *pArg, int argc, char **argv, char **columnN
switch_set_string(exp_buf, "now");
}
x_bridge = switch_xml_add_child_d(h->xml, h->tag, h->row_off++);
switch_xml_set_attr_d(x_bridge, "fifo_name", argv[0]);
......@@ -3746,8 +3824,6 @@ static int xml_bridge_callback(void *pArg, int argc, char **argv, char **columnN
encoded = switch_url_encode(argv[3], url_buf, sizeof(url_buf));
switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
if (h->verbose) {
if ((session = switch_core_session_locate(argv[1]))) {
x_cdr = switch_xml_add_child_d(x_caller, "cdr", 0);
......@@ -3861,7 +3937,6 @@ static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char
return cc_off;
}
static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
{
switch_xml_t x_tmp, x_caller, x_cp;
......@@ -3878,7 +3953,6 @@ static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char
switch_mutex_lock(q->mutex);
for (i = 0; i < q->idx; i++) {
int c_off = 0, d_off = 0;
const char *status;
const char *ts;
......@@ -3930,7 +4004,6 @@ static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char
switch_snprintf(sl, sizeof(sl), "%d", x);
switch_xml_set_attr_d_buf(x_caller, "slot", sl);
if (verbose) {
if (!(x_cp = switch_xml_add_child_d(x_caller, "cdr", d_off++))) {
abort();
......@@ -3981,7 +4054,7 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve
switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_priority);
switch_xml_set_attr_d(x_fifo, "outbound_priority", tmp);
switch_xml_set_attr_d(x_fifo, "outbound_strategy", strat_parse(node->outbound_strategy));
switch_xml_set_attr_d(x_fifo, "outbound_strategy", print_strategy(node->outbound_strategy));
cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose);
cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose);
......@@ -3989,7 +4062,6 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve
cc_off = xml_bridges(x_fifo, node, "bridges", "bridge", cc_off, verbose);
}
void dump_hash(switch_hash_t *hash, switch_stream_handle_t *stream)
{
switch_hash_index_t *hi;
......@@ -4006,8 +4078,6 @@ void dump_hash(switch_hash_t *hash, switch_stream_handle_t *stream)
void node_dump(switch_stream_handle_t *stream)
{
switch_hash_index_t *hi;
fifo_node_t *node;
void *val;
......@@ -4027,7 +4097,7 @@ void node_dump(switch_stream_handle_t *stream)
" waiting: %d\n"
,
node->name, node->outbound_name, node->outbound_per_cycle,
node->outbound_priority, strat_parse(node->outbound_strategy),
node->outbound_priority, print_strategy(node->outbound_strategy),
node->has_outbound,
node->outbound_priority,
node->busy,
......@@ -4046,13 +4116,9 @@ void node_dump(switch_stream_handle_t *stream)
dump_hash(globals.bridge_hash, stream);
switch_mutex_unlock(globals.mutex);
}
#define FIFO_API_SYNTAX "list|list_verbose|count|debug|status|importance [<fifo name>]|reparse [del_all]"
#define FIFO_API_SYNTAX "list|list_verbose|count|debug|status|has_outbound|importance [<fifo name>]|reparse [del_all]"
SWITCH_STANDARD_API(fifo_api_function)
{
fifo_node_t *node;
......@@ -4073,7 +4139,6 @@ SWITCH_STANDARD_API(fifo_api_function)
switch_assert(data);
}
switch_mutex_lock(globals.mutex);
if (zstr(cmd) || (argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 1 || !argv[0]) {
......@@ -4130,7 +4195,6 @@ SWITCH_STANDARD_API(fifo_api_function)
stream->write_function(stream, "%s\n", xml_text);
switch_xml_free(x_report);
switch_safe_free(xml_text);
} else if (!strcasecmp(argv[0], "importance")) {
if (argv[1] && (node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
int importance = 0;
......@@ -4199,7 +4263,6 @@ SWITCH_STANDARD_API(fifo_api_function)
return SWITCH_STATUS_SUCCESS;
}
const char outbound_sql[] =
"create table fifo_outbound (\n"
" uuid varchar(255),\n"
......@@ -4230,7 +4293,6 @@ const char outbound_sql[] =
" stop_time integer not null default 0\n"
");\n";
const char bridge_sql[] =
"create table fifo_bridge (\n"
" fifo_name varchar(1024) not null,\n"
......@@ -4254,8 +4316,6 @@ const char callers_sql[] =
");\n"
;
static void extract_fifo_outbound_uuid(char *string, char *uuid, switch_size_t len)
{
switch_event_t *ovars;
......@@ -4263,7 +4323,6 @@ static void extract_fifo_outbound_uuid(char *string, char *uuid, switch_size_t l
const char *fifo_outbound_uuid;
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
switch_event_create_brackets(string, '{', '}', ',', &ovars, &parsed, SWITCH_TRUE);
if ((fifo_outbound_uuid = switch_event_get_header(ovars, "fifo_outbound_uuid"))) {
......@@ -4274,38 +4333,23 @@ static void extract_fifo_outbound_uuid(char *string, char *uuid, switch_size_t l
switch_event_destroy(&ovars);
}
static switch_status_t load_config(int reload, int del_all)
{
char *cf = "fifo.conf";
switch_xml_t cfg, xml, fifo, fifos, member, settings, param;
switch_status_t status = SWITCH_STATUS_SUCCESS;
char *sql;
switch_bool_t delete_all_outbound_member_on_startup = SWITCH_FALSE;
switch_cache_db_handle_t *dbh = NULL;
fifo_node_t *node;
strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);
static switch_status_t read_config_file(switch_xml_t *xml, switch_xml_t *cfg) {
const char *cf = "fifo.conf";
switch_xml_t settings;
if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
if (!(*xml = switch_xml_open_cfg(cf, cfg, NULL))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
return SWITCH_STATUS_TERM;
}
globals.dbname = "fifo";
if ((settings = switch_xml_child(cfg, "settings"))) {
if ((settings = switch_xml_child(*cfg, "settings"))) {
switch_xml_t param;
for (param = switch_xml_child(settings, "param"); param; param = param->next) {
char *var = NULL;
char *val = NULL;
var = (char *) switch_xml_attr_soft(param, "name");
val = (char *) switch_xml_attr_soft(param, "value");
char *var = (char*)switch_xml_attr_soft(param, "name");
char *val = (char*)switch_xml_attr_soft(param, "value");
if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) {
default_strategy = parse_strat(val);
}
if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
globals.default_strategy = parse_strategy(val);
} else if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
if (switch_odbc_available() || switch_pgsql_available()) {
switch_set_string(globals.odbc_dsn, val);
} else {
......@@ -4324,11 +4368,40 @@ static switch_status_t load_config(int reload, int del_all)
} else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
delete_all_outbound_member_on_startup = switch_true(val);
globals.delete_all_members_on_startup = switch_true(val);
}
}
}
return SWITCH_STATUS_SUCCESS;
}
/*!\brief Load or reload the configuration
*
* On the initial load, non-static members are preserved unless the
* parameter `delete-all-outbound-members-on-startup` is set. The
* parameter `del_all` is ignored in this case.
*
* On reload, non-static members are preserved unless `del_all` is
* set.
*
* \param reload true if we're reloading the config
* \param del_all delete all outbound members when reloading;
* not used unless reload is true
*/
static switch_status_t load_config(int reload, int del_all)
{
switch_xml_t xml, cfg, fifo, fifos, member;
switch_status_t status = SWITCH_STATUS_SUCCESS;
char *sql;
switch_cache_db_handle_t *dbh = NULL;
fifo_node_t *node;
strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);
globals.dbname = "fifo";
globals.default_strategy = NODE_STRATEGY_RINGALL;
globals.delete_all_members_on_startup = SWITCH_FALSE;
if ((status = read_config_file(&xml, &cfg)) != SWITCH_STATUS_SUCCESS) return status;
if (!(dbh = fifo_get_db_handle())) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot open DB!\n");
......@@ -4345,11 +4418,8 @@ static switch_status_t load_config(int reload, int del_all)
globals.post_trans_execute,
globals.inner_pre_trans_execute,
globals.inner_post_trans_execute);
switch_sql_queue_manager_start(globals.qm);
switch_cache_db_test_reactive(dbh, "delete from fifo_outbound where static = 1 or taking_calls < 0 or stop_time < 0",
"drop table fifo_outbound", outbound_sql);
switch_cache_db_test_reactive(dbh, "delete from fifo_bridge", "drop table fifo_bridge", bridge_sql);
......@@ -4378,7 +4448,7 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_unlock(globals.mutex);
}
if ((reload && del_all) || (!reload && delete_all_outbound_member_on_startup)) {
if ((reload && del_all) || (!reload && globals.delete_all_members_on_startup)) {
sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname);
} else {
sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
......@@ -4392,22 +4462,13 @@ static switch_status_t load_config(int reload, int del_all)
node->is_static = 0;
}
if ((fifos = switch_xml_child(cfg, "fifos"))) {
for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) {
const char *name, *outbound_strategy;
const char *name, *sp;
const char *val;
int imp = 0, outbound_per_cycle = 1, outbound_priority = 5;
int simo_i = 1;
int taking_calls_i = 1;
int timeout_i = 60;
int lag_i = 10;
int ring_timeout = 60;
int default_lag = 30;
name = switch_xml_attr(fifo, "name");
int i, importance = 0;
if (!name) {
if (!(name = switch_xml_attr(fifo, "name"))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fifo has no name!\n");
continue;
}
......@@ -4417,15 +4478,13 @@ static switch_status_t load_config(int reload, int del_all)
continue;
}
if ((val = switch_xml_attr(fifo, "importance"))) {
if ((imp = atoi(val)) < 0) {
imp = 0;
}
if ((val = switch_xml_attr(fifo, "importance")) && !(i = atoi(val)) < 0) {
importance = i;
}
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, name))) {
node = create_node(name, imp, globals.sql_mutex);
node = create_node(name, importance, globals.sql_mutex);
}
if ((val = switch_xml_attr(fifo, "outbound_name"))) {
......@@ -4433,74 +4492,57 @@ static switch_status_t load_config(int reload, int del_all)
}
switch_mutex_unlock(globals.mutex);
switch_assert(node);
switch_mutex_lock(node->mutex);
outbound_strategy = switch_xml_attr(fifo, "outbound_strategy");
if ((sp = switch_xml_attr(fifo, "outbound_strategy"))) {
node->outbound_strategy = parse_strategy(sp);
node->has_outbound = 1;
}
node->outbound_per_cycle = 1;
if ((val = switch_xml_attr(fifo, "outbound_per_cycle"))) {
if ((outbound_per_cycle = atoi(val)) < 0) {
outbound_per_cycle = 1;
if (!(i = atoi(val)) < 0) {
node->outbound_per_cycle = i;
}
node->has_outbound = 1;
}
if ((val = switch_xml_attr(fifo, "retry_delay"))) {
int tmp;
if ((tmp = atoi(val)) < 0) {
tmp = 0;
}
node->retry_delay = tmp;
if ((i = atoi(val)) < 0) i = 0;
node->retry_delay = i;
}
node->outbound_priority = 5;
if ((val = switch_xml_attr(fifo, "outbound_priority"))) {
outbound_priority = atoi(val);
if (outbound_priority < 1 || outbound_priority > 10) {
outbound_priority = 5;
i = atoi(val);
if (!(i < 1 || i > 10)) {
node->outbound_priority = i;
}
node->has_outbound = 1;
}
node->ring_timeout = 60;
if ((val = switch_xml_attr(fifo, "outbound_ring_timeout"))) {
int tmp = atoi(val);
if (tmp > 10) {
ring_timeout = tmp;
if ((i = atoi(val)) > 10) {
node->ring_timeout = i;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid ring_timeout: must be > 10 for queue %s\n", node->name);
}
}
node->default_lag = 30;
if ((val = switch_xml_attr(fifo, "outbound_default_lag"))) {
int tmp = atoi(val);
if (tmp > 10) {
default_lag = tmp;
if ((i = atoi(val)) > 10) {
node->default_lag = i;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid default_lag: must be > 10 for queue %s\n", node->name);
}
}
node->ring_timeout = ring_timeout;
node->outbound_per_cycle = outbound_per_cycle;
node->outbound_priority = outbound_priority;
node->default_lag = default_lag;
if (outbound_strategy) {
node->outbound_strategy = parse_strat(outbound_strategy);
node->has_outbound = 1;
}
for (member = switch_xml_child(fifo, "member"); member; member = member->next) {
const char *simo = switch_xml_attr_soft(member, "simo");
const char *lag = switch_xml_attr_soft(member, "lag");
const char *timeout = switch_xml_attr_soft(member, "timeout");
const char *taking_calls = switch_xml_attr_soft(member, "taking_calls");
char *name_dup, *p;
const char *simo, *taking_calls, *timeout, *lag;
int simo_i = 1, taking_calls_i = 1, timeout_i = 60, lag_i = 10;
char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
if (switch_stristr("fifo_outbound_uuid=", member->txt)) {
......@@ -4509,35 +4551,25 @@ static switch_status_t load_config(int reload, int del_all)
switch_md5_string(digest, (void *) member->txt, strlen(member->txt));
}
if (simo) {
if ((simo = switch_xml_attr_soft(member, "simo"))) {
simo_i = atoi(simo);
}
if (taking_calls) {
if ((taking_calls_i = atoi(taking_calls)) < 1) {
taking_calls_i = 1;
}
}
if (timeout) {
if ((timeout_i = atoi(timeout)) < 10) {
timeout_i = ring_timeout;
}
if ((taking_calls = switch_xml_attr_soft(member, "taking_calls"))
&& (taking_calls_i = atoi(taking_calls)) < 1) {
taking_calls_i = 1;
}
if (lag) {
if ((lag_i = atoi(lag)) < 0) {
lag_i = default_lag;
}
if ((timeout = switch_xml_attr_soft(member, "timeout"))
&& (timeout_i = atoi(timeout)) < 10) {
timeout_i = node->ring_timeout;
}
name_dup = strdup(node->name);
if ((p = strchr(name_dup, '@'))) {
*p = '\0';
if ((lag = switch_xml_attr_soft(member, "lag"))
&& (lag_i = atoi(lag)) < 0) {
lag_i = node->default_lag;
}
sql = switch_mprintf("insert into fifo_outbound "
"(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname, taking_calls, "
......@@ -4545,28 +4577,23 @@ static switch_status_t load_config(int reload, int del_all)
"values ('%q','%q','%q',%d,%d,%d,%d,0,0,1,0,0,'%q',%d,%ld,0)",
digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, globals.hostname, taking_calls_i,
(long) switch_epoch_time_now(NULL));
switch_assert(sql);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
free(name_dup);
node->has_outbound = 1;
node->member_count++;
}
node->ready = 1;
node->is_static = 1;
switch_mutex_unlock(node->mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s configured\n", node->name);
}
}
switch_xml_free(xml);
done:
switch_xml_free(xml);
if (reload) {
fifo_node_t *node;
switch_mutex_lock(globals.mutex);
for (node = globals.nodes; node; node = node->next) {
if (node->ready == -1) {
......@@ -4578,18 +4605,15 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_unlock(globals.mutex);
}
return status;
}
static void fifo_member_add(char *fifo_name, char *originate_string, int simo_count, int timeout, int lag, time_t expires, int taking_calls)
{
char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
char *sql, *name_dup, *p;
char outbound_count[80] = "";
callback_t cbt = { 0 };
char outbound_count[80] = "";
callback_t cbt = { 0 };
fifo_node_t *node = NULL;
if (!fifo_name) return;
......@@ -4604,7 +4628,6 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
switch_assert(sql);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
node = create_node(fifo_name, 0, globals.sql_mutex);
......@@ -4627,17 +4650,17 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(name_dup);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", fifo_name);
fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
node->member_count = atoi(outbound_count);
if (node->member_count > 0) {
node->has_outbound = 1;
} else {
node->has_outbound = 0;
}
switch_safe_free(sql);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", fifo_name);
fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
node->member_count = atoi(outbound_count);
if (node->member_count > 0) {
node->has_outbound = 1;
} else {
node->has_outbound = 0;
}
switch_safe_free(sql);
}
static void fifo_member_del(char *fifo_name, char *originate_string)
......@@ -4650,7 +4673,6 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
if (!fifo_name) return;
if (switch_stristr("fifo_outbound_uuid=", originate_string)) {
extract_fifo_outbound_uuid(originate_string, digest, sizeof(digest));
} else {
......@@ -4672,11 +4694,11 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
node->member_count = atoi(outbound_count);
node->member_count = atoi(outbound_count);
if (node->member_count > 0) {
node->has_outbound = 1;
node->has_outbound = 1;
} else {
node->has_outbound = 0;
node->has_outbound = 0;
}
switch_safe_free(sql);
}
......@@ -4762,7 +4784,6 @@ SWITCH_STANDARD_API(fifo_member_api_function)
free(mydata);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
......@@ -4808,7 +4829,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
return status;
}
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "fifo", "Park with FIFO", FIFO_DESC, fifo_function, FIFO_USAGE, SAF_NONE);
......@@ -4821,6 +4841,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
switch_console_set_complete("add fifo list");
switch_console_set_complete("add fifo list_verbose");
switch_console_set_complete("add fifo count");
switch_console_set_complete("add fifo debug");
switch_console_set_complete("add fifo status");
switch_console_set_complete("add fifo has_outbound");
switch_console_set_complete("add fifo importance");
switch_console_set_complete("add fifo reparse");
......@@ -4864,7 +4886,6 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
this_node = node;
node = node->next;
switch_mutex_lock(this_node->update_mutex);
switch_mutex_lock(this_node->mutex);
for (x = 0; x < MAX_PRI; x++) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论