提交 61f23283 authored 作者: Anthony Minessale's avatar Anthony Minessale

add some goodies to mod_fifo

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@8026 d0543943-73ff-0310-b7d9-9358b9ac24b2
上级 9b1ef84d
...@@ -216,7 +216,8 @@ ...@@ -216,7 +216,8 @@
--> -->
<extension name="park"> <extension name="park">
<condition field="destination_number" expression="^5900$"> <condition field="destination_number" expression="^5900$">
<action application="fifo" data="5900@$${domain} in undef $${moh_uri}"/> <action application="set" data="fifo_music=$${moh_uri}"/>
<action application="fifo" data="5900@$${domain} in"/>
</condition> </condition>
</extension> </extension>
......
...@@ -937,7 +937,8 @@ typedef enum { ...@@ -937,7 +937,8 @@ typedef enum {
SWITCH_FILE_PAUSE = (1 << 8), SWITCH_FILE_PAUSE = (1 << 8),
SWITCH_FILE_NATIVE = (1 << 9), SWITCH_FILE_NATIVE = (1 << 9),
SWITCH_FILE_SEEK = (1 << 10), SWITCH_FILE_SEEK = (1 << 10),
SWITCH_FILE_OPEN = (1 << 11) SWITCH_FILE_OPEN = (1 << 11),
SWITCH_FILE_CALLBACK = (1 << 12)
} switch_file_flag_t; } switch_file_flag_t;
typedef enum { typedef enum {
......
...@@ -45,8 +45,9 @@ struct fifo_node { ...@@ -45,8 +45,9 @@ struct fifo_node {
switch_hash_t *caller_hash; switch_hash_t *caller_hash;
switch_hash_t *consumer_hash; switch_hash_t *consumer_hash;
int caller_count; int caller_count;
int waiting_count;
int consumer_count; int consumer_count;
switch_time_t start_waiting;
uint32_t importance;
}; };
typedef struct fifo_node fifo_node_t; typedef struct fifo_node fifo_node_t;
...@@ -103,13 +104,109 @@ static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input, ...@@ -103,13 +104,109 @@ static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input,
#define check_string(s) if (!switch_strlen_zero(s) && !strcasecmp(s, "undef")) { s = NULL; } #define check_string(s) if (!switch_strlen_zero(s) && !strcasecmp(s, "undef")) { s = NULL; }
static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) static int node_consumer_wait_count(fifo_node_t *node)
{ {
fifo_node_t *node = (fifo_node_t *) user_data; int i, len = 0;
int x = 0, total = 0;
for (x = 0; x < MAX_PRI; x++) { for (i = 0; i < MAX_PRI; i++) {
total += switch_queue_size(node->fifo_list[x]); len += switch_queue_size(node->fifo_list[i]);
}
return len;
}
static void node_remove_uuid(fifo_node_t *node, const char *uuid)
{
int i, len = 0;
void *pop = NULL;
for (i = 0; i < MAX_PRI; i++) {
if (!(len = switch_queue_size(node->fifo_list[i]))) {
continue;
}
while(len) {
if (switch_queue_trypop(node->fifo_list[i], &pop) == SWITCH_STATUS_SUCCESS && pop) {
if (!strcmp((char *)pop, uuid)) {
free(pop);
goto end;
}
switch_queue_push(node->fifo_list[i], pop);
}
len--;
}
}
end:
if (!node_consumer_wait_count(node)) {
node->start_waiting = 0;
}
return;
}
#define MAX_CHIME 25
struct fifo_chime_data {
char *list[MAX_CHIME];
int total;
int index;
time_t next;
int freq;
int abort;
int orbit_timeout;
int do_orbit;
char *orbit_exten;
};
typedef struct fifo_chime_data fifo_chime_data_t;
static switch_status_t caller_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{
fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
if (cd && cd->total && switch_timestamp(NULL) >= cd->next) {
if (cd->index == MAX_CHIME || cd->index == cd->total || !cd->list[cd->index]) {
cd->index = 0;
}
if (cd->list[cd->index]) {
switch_input_args_t args = { 0 };
char buf[25] = "";
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
args.input_callback = moh_on_dtmf;
args.buf = buf;
args.buflen = sizeof(buf);
switch_ivr_play_file(session, NULL, cd->list[cd->index], &args);
if (caller_exit_key && *buf == *caller_exit_key) {
cd->abort = 1;
return SWITCH_STATUS_FALSE;
}
cd->next = switch_timestamp(NULL) + cd->freq;
cd->index++;
}
} else if (cd->orbit_timeout && switch_timestamp(NULL) >= cd->orbit_timeout) {
cd->do_orbit = 1;
return SWITCH_STATUS_FALSE;
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t consumer_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{
fifo_node_t *node, **node_list = (fifo_node_t **) user_data;
int x = 0, total = 0, i = 0;
for(i = 0; ; i++) {
if (!(node = node_list[i])) {
break;
}
for (x = 0; x < MAX_PRI; x++) {
total += switch_queue_size(node->fifo_list[x]);
}
} }
if (total) { if (total) {
...@@ -127,7 +224,7 @@ static struct { ...@@ -127,7 +224,7 @@ static struct {
} globals; } globals;
static fifo_node_t *create_node(const char *name) static fifo_node_t *create_node(const char *name, uint32_t importance)
{ {
fifo_node_t *node; fifo_node_t *node;
int x = 0; int x = 0;
...@@ -148,12 +245,15 @@ static fifo_node_t *create_node(const char *name) ...@@ -148,12 +245,15 @@ static fifo_node_t *create_node(const char *name)
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_core_hash_insert(globals.fifo_hash, name, node); switch_core_hash_insert(globals.fifo_hash, name, node);
node->importance = importance;
return node; return node;
} }
static void send_presence(fifo_node_t *node) static void send_presence(fifo_node_t *node)
{ {
switch_event_t *event; switch_event_t *event;
int wait_count = 0;
if (!globals.running) { if (!globals.running) {
return; return;
...@@ -163,8 +263,8 @@ static void send_presence(fifo_node_t *node) ...@@ -163,8 +263,8 @@ static void send_presence(fifo_node_t *node)
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", "%s", "park"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", "%s", "park");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", node->name); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", node->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s", node->name); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s", node->name);
if (node->waiting_count > 0) { if ((wait_count = node_consumer_wait_count(node)) > 0) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d waiting)", node->waiting_count); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d waiting)", wait_count);
} else { } else {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Idle"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Idle");
} }
...@@ -173,9 +273,9 @@ static void send_presence(fifo_node_t *node) ...@@ -173,9 +273,9 @@ static void send_presence(fifo_node_t *node)
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_count", "%d", 0); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_count", "%d", 0);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "channel-state", "%s", node->waiting_count > 0 ? "CS_RING" : "CS_HANGUP"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "channel-state", "%s", wait_count > 0 ? "CS_RING" : "CS_HANGUP");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "unique-id", "%s", node->name); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "unique-id", "%s", node->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "answer-state", "%s", node->waiting_count > 0 ? "early" : "terminated"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "answer-state", "%s", wait_count > 0 ? "early" : "terminated");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "call-direction", "%s", "inbound"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "call-direction", "%s", "inbound");
switch_event_fire(&event); switch_event_fire(&event);
} }
...@@ -203,7 +303,7 @@ static void pres_event_handler(switch_event_t *event) ...@@ -203,7 +303,7 @@ static void pres_event_handler(switch_event_t *event)
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) { if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) {
node = create_node(node_name); node = create_node(node_name, 0);
} }
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
...@@ -215,15 +315,16 @@ static void pres_event_handler(switch_event_t *event) ...@@ -215,15 +315,16 @@ static void pres_event_handler(switch_event_t *event)
switch_safe_free(dup_to); switch_safe_free(dup_to);
} }
#define MAX_NODES_PER_CONSUMER 25
#define FIFO_DESC "Fifo for stacking parked calls." #define FIFO_DESC "Fifo for stacking parked calls."
#define FIFO_USAGE "<fifo name> [in [<announce file>|undef] [<music file>|undef] | out [wait|nowait] [<announce file>|undef] [<music file>|undef]]" #define FIFO_USAGE "<fifo name> [in [<announce file>|undef] [<music file>|undef] | out [wait|nowait] [<announce file>|undef] [<music file>|undef]]"
SWITCH_STANDARD_APP(fifo_function) SWITCH_STANDARD_APP(fifo_function)
{ {
int argc; int argc;
char *mydata = NULL, *argv[5] = { 0 }; char *mydata = NULL, *argv[5] = { 0 };
fifo_node_t *node; fifo_node_t *node = NULL, *node_list[MAX_NODES_PER_CONSUMER+1] = { 0 };
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
int nowait = 0; int do_wait = 1, node_count = 0, i = 0;
const char *moh = NULL; const char *moh = NULL;
const char *announce = NULL; const char *announce = NULL;
switch_event_t *event = NULL; switch_event_t *event = NULL;
...@@ -231,6 +332,12 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -231,6 +332,12 @@ SWITCH_STANDARD_APP(fifo_function)
switch_time_exp_t tm; switch_time_exp_t tm;
switch_time_t ts = switch_timestamp_now(); switch_time_t ts = switch_timestamp_now();
switch_size_t retsize; switch_size_t retsize;
char *list_string;
int nlist_count;
char *nlist[MAX_NODES_PER_CONSUMER];
int consumer = 0;
const char *arg_fifo_name = NULL;
const char *arg_inout = NULL;
const char *serviced_uuid = NULL; const char *serviced_uuid = NULL;
if (!globals.running) { if (!globals.running) {
...@@ -244,44 +351,105 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -244,44 +351,105 @@ SWITCH_STANDARD_APP(fifo_function)
mydata = switch_core_session_strdup(session, data); mydata = switch_core_session_strdup(session, data);
switch_assert(mydata); switch_assert(mydata);
if ((argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 2 || !argv[0]) {
argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
arg_fifo_name = argv[0];
arg_inout = argv[1];
if (!arg_fifo_name && arg_inout) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
return; return;
} }
if (!strcasecmp(arg_inout, "out")) {
consumer = 1;
} else if (strcasecmp(arg_inout, "in")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
return;
}
list_string = switch_core_session_strdup(session, arg_fifo_name);
if (!(nlist_count = switch_separate_string(list_string, ',', nlist, (sizeof(nlist) / sizeof(nlist[0]))))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
return;
}
if (!consumer && nlist_count > 1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
return;
}
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, argv[0]))) { for(i = 0; i < nlist_count; i++) {
node = create_node(argv[0]); int importance = 0;
} char *p;
if ((p = strrchr(nlist[i], '!'))) {
*p++ = '\0';
importance = atoi(p);
if (importance < 0) {
importance = 0;
}
}
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
node = create_node(nlist[i], importance);
}
node_list[node_count++] = node;
}
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
moh = switch_channel_get_variable(channel, "fifo_music"); moh = switch_channel_get_variable(channel, "fifo_music");
announce = switch_channel_get_variable(channel, "fifo_announce"); announce = switch_channel_get_variable(channel, "fifo_announce");
check_string(announce);
check_string(moh);
if (!strcasecmp(argv[1], "in")) { if (!consumer && node) {
switch_core_session_t *other_session; switch_core_session_t *other_session;
switch_channel_t *other_channel; switch_channel_t *other_channel;
const char *uuid = strdup(switch_core_session_get_uuid(session)); const char *uuid = strdup(switch_core_session_get_uuid(session));
const char *pri; const char *pri;
char tmp[25] = "";
int p = 0; int p = 0;
int aborted = 0; int aborted = 0;
fifo_chime_data_t cd = { {0} };
const char *chime_list = switch_channel_get_variable(channel, "fifo_chime_list");
const char *chime_freq = switch_channel_get_variable(channel, "fifo_chime_freq");
const char *orbit_var = switch_channel_get_variable(channel, "fifo_orbit_exten");
const char *orbit_ann = switch_channel_get_variable(channel, "fifo_orbit_announce");
const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
int freq = 30;
int ftmp = 0;
int to = 60;
if (orbit_var) {
char *ot;
if ((cd.orbit_exten = switch_core_session_strdup(session, orbit_var))) {
if ((ot = strchr(cd.orbit_exten, ':'))) {
*ot++ = '\0';
if ((to = atoi(ot)) < 0) {
to = 60;
}
}
cd.orbit_timeout = switch_timestamp(NULL) + to;
}
}
if (chime_freq) {
ftmp = atoi(chime_freq);
if (ftmp > 0) {
freq = ftmp;
}
}
switch_channel_answer(channel);
if (argc > 2) {
announce = argv[2];
}
if (argc > 3) {
moh = argv[3];
}
check_string(announce); switch_channel_answer(channel);
check_string(moh);
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
node->caller_count++; node->caller_count++;
node->waiting_count++;
send_presence(node); send_presence(node);
switch_core_hash_insert(node->caller_hash, uuid, session); switch_core_hash_insert(node->caller_hash, uuid, session);
...@@ -292,11 +460,20 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -292,11 +460,20 @@ SWITCH_STANDARD_APP(fifo_function)
if (p >= MAX_PRI) { if (p >= MAX_PRI) {
p = MAX_PRI - 1; p = MAX_PRI - 1;
} }
if (!node_consumer_wait_count(node)) {
node->start_waiting = switch_timestamp_now();
}
switch_queue_push(node->fifo_list[p], (void *)uuid); switch_queue_push(node->fifo_list[p], (void *)uuid);
switch_mutex_unlock(node->mutex); if (!pri) {
switch_snprintf(tmp, sizeof(tmp), "%d", p);
switch_channel_set_variable(channel, "fifo_priority", tmp);
}
switch_mutex_unlock(node->mutex);
ts = switch_timestamp_now(); ts = switch_timestamp_now();
switch_time_exp_lt(&tm, ts); switch_time_exp_lt(&tm, ts);
switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
...@@ -307,19 +484,36 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -307,19 +484,36 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_event_set_data(channel, event); switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p);
switch_event_fire(&event); switch_event_fire(&event);
} }
switch_channel_set_flag(channel, CF_TAGGED); switch_channel_set_flag(channel, CF_TAGGED);
if (chime_list) {
char *list_dup = switch_core_session_strdup(session, chime_list);
cd.total = switch_separate_string(list_dup, ',', cd.list, (sizeof(cd.list) / sizeof(cd.list[0])));
cd.freq = freq;
cd.next = switch_timestamp(NULL) + cd.freq;
}
while(switch_channel_ready(channel)) { while(switch_channel_ready(channel)) {
switch_input_args_t args = { 0 }; switch_input_args_t args = { 0 };
char buf[25] = ""; char buf[25] = "";
const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
args.input_callback = moh_on_dtmf; args.input_callback = moh_on_dtmf;
args.buf = buf; args.buf = buf;
args.buflen = sizeof(buf); args.buflen = sizeof(buf);
if (cd.total || cd.orbit_timeout) {
args.read_frame_callback = caller_read_frame_callback;
args.user_data = &cd;
}
if (cd.abort || cd.do_orbit) {
aborted = 1;
goto abort;
}
if ((serviced_uuid = switch_channel_get_variable(channel, "fifo_serviced_uuid"))) { if ((serviced_uuid = switch_channel_get_variable(channel, "fifo_serviced_uuid"))) {
break; break;
...@@ -367,25 +561,32 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -367,25 +561,32 @@ SWITCH_STANDARD_APP(fifo_function)
ts = switch_timestamp_now(); ts = switch_timestamp_now();
switch_time_exp_lt(&tm, ts); switch_time_exp_lt(&tm, ts);
switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
switch_channel_set_variable(channel, "fifo_status", "ABORTED"); switch_channel_set_variable(channel, "fifo_status", cd.do_orbit ? "TIMEOUT" : "ABORTED");
switch_channel_set_variable(channel, "fifo_timestamp", date); switch_channel_set_variable(channel, "fifo_timestamp", date);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(channel, event); switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "abort"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort");
switch_event_fire(&event); switch_event_fire(&event);
} }
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
node_remove_uuid(node, uuid);
node->caller_count--; node->caller_count--;
node->waiting_count--;
send_presence(node); send_presence(node);
switch_core_hash_delete(node->caller_hash, uuid); switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
} }
if (cd.do_orbit && cd.orbit_exten) {
if (orbit_ann) {
switch_ivr_play_file(session, NULL, orbit_ann, NULL);
}
switch_ivr_session_transfer(session, cd.orbit_exten, NULL, NULL);
}
return; return;
} else if (!strcasecmp(argv[1], "out")) { } else { /* consumer */
void *pop = NULL; void *pop = NULL;
switch_frame_t *read_frame; switch_frame_t *read_frame;
switch_status_t status; switch_status_t status;
...@@ -403,35 +604,31 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -403,35 +604,31 @@ SWITCH_STANDARD_APP(fifo_function)
char buf[5] = ""; char buf[5] = "";
if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) {
my_id = switch_core_session_get_uuid(session);
}
if (argc > 2) { if (argc > 2) {
if (!strcasecmp(argv[2], "nowait")) { if (!strcasecmp(argv[2], "nowait")) {
nowait++; do_wait = 0;
} else if (strcasecmp(argv[2], "wait")) { } else if (strcasecmp(argv[2], "wait")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
return; return;
} }
} }
if (argc > 3) {
announce = argv[3];
}
if (argc > 4) {
moh = argv[4]; if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) {
} my_id = switch_core_session_get_uuid(session);
}
check_string(announce);
check_string(moh);
if (!nowait) { if (do_wait) {
switch_mutex_lock(node->mutex); for (i = 0; i < node_count; i++) {
node->consumer_count++; if (!(node = node_list[i])) {
switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session); continue;
switch_mutex_unlock(node->mutex); }
switch_mutex_lock(node->mutex);
node->consumer_count++;
switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session);
switch_mutex_unlock(node->mutex);
}
switch_channel_answer(channel); switch_channel_answer(channel);
} }
...@@ -465,32 +662,69 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -465,32 +662,69 @@ SWITCH_STANDARD_APP(fifo_function)
} }
while(switch_channel_ready(channel)) { while(switch_channel_ready(channel)) {
int x = 0 ; int x = 0, winner = -1;
switch_time_t longest = 0xFFFFFFFFFFFFFFFF / 2;
uint32_t importance = 0;
pop = NULL; pop = NULL;
if (moh) { if (moh && do_wait) {
memset(&args, 0, sizeof(args)); memset(&args, 0, sizeof(args));
args.read_frame_callback = read_frame_callback; args.read_frame_callback = consumer_read_frame_callback;
args.user_data = node; args.user_data = node_list;
switch_ivr_play_file(session, NULL, moh, &args); switch_ivr_play_file(session, NULL, moh, &args);
} }
if (custom_pop) { for(i = 0; i < node_count; i++) {
for(x = 0; x < MAX_PRI; x++) { if (!(node = node_list[i])) {
if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) { continue;
break; }
if (node_consumer_wait_count(node)) {
if (!importance && node->start_waiting < longest) {
longest = node->start_waiting;
winner = i;
}
if (node->importance > importance) {
importance = node->importance;
winner = i;
} }
} }
}
if (winner > -1) {
node = node_list[winner];
} else { } else {
for(x = 0; x < MAX_PRI; x++) { node = NULL;
if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { }
break;
if (node) {
if (custom_pop) {
for(x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
} else {
for(x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
} }
} }
if (pop && !node_consumer_wait_count(node)) {
switch_mutex_lock(node->mutex);
node->start_waiting = 0;
switch_mutex_unlock(node->mutex);
}
} }
if (!pop) { if (!pop) {
if (nowait) { if (!do_wait) {
break; break;
} }
...@@ -506,7 +740,7 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -506,7 +740,7 @@ SWITCH_STANDARD_APP(fifo_function)
uuid = (char *) pop; uuid = (char *) pop;
pop = NULL; pop = NULL;
if ((other_session = switch_core_session_locate(uuid))) { if (node && (other_session = switch_core_session_locate(uuid))) {
switch_channel_t *other_channel = switch_core_session_get_channel(other_session); switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
switch_caller_profile_t *cloned_profile; switch_caller_profile_t *cloned_profile;
const char *o_announce = NULL; const char *o_announce = NULL;
...@@ -575,7 +809,6 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -575,7 +809,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable(other_channel, "fifo_timestamp", date); switch_channel_set_variable(other_channel, "fifo_timestamp", date);
switch_channel_set_variable(other_channel, "fifo_target", switch_core_session_get_uuid(session)); switch_channel_set_variable(other_channel, "fifo_target", switch_core_session_get_uuid(session));
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
node->waiting_count--;
send_presence(node); send_presence(node);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session); switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
...@@ -595,7 +828,7 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -595,7 +828,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_core_hash_delete(node->caller_hash, uuid); switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
switch_core_session_rwunlock(other_session); switch_core_session_rwunlock(other_session);
if (nowait) { if (!do_wait) {
done = 1; done = 1;
} }
...@@ -620,14 +853,14 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -620,14 +853,14 @@ SWITCH_STANDARD_APP(fifo_function)
} }
} }
} }
switch_safe_free(uuid); switch_safe_free(uuid);
if (done) { if (done) {
break; break;
} }
} }
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(channel, event); switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]);
...@@ -635,15 +868,18 @@ SWITCH_STANDARD_APP(fifo_function) ...@@ -635,15 +868,18 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_fire(&event); switch_event_fire(&event);
} }
if (!nowait) { if (do_wait) {
switch_mutex_lock(node->mutex); for (i = 0; i < node_count; i++) {
switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session)); if (!(node = node_list[i])) {
node->consumer_count--; continue;
switch_mutex_unlock(node->mutex); }
switch_mutex_lock(node->mutex);
switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session));
node->consumer_count--;
switch_mutex_unlock(node->mutex);
}
} }
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
} }
} }
...@@ -718,15 +954,17 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve ...@@ -718,15 +954,17 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve
switch_xml_set_attr_d(x_fifo, "consumer_count", tmp); switch_xml_set_attr_d(x_fifo, "consumer_count", tmp);
switch_snprintf(tmp, sizeof(buffer), "%d", node->caller_count); switch_snprintf(tmp, sizeof(buffer), "%d", node->caller_count);
switch_xml_set_attr_d(x_fifo, "caller_count", tmp); switch_xml_set_attr_d(x_fifo, "caller_count", tmp);
switch_snprintf(tmp, sizeof(buffer), "%d", node->waiting_count); switch_snprintf(tmp, sizeof(buffer), "%d", node_consumer_wait_count(node));
switch_xml_set_attr_d(x_fifo, "waiting_count", tmp); switch_xml_set_attr_d(x_fifo, "waiting_count", tmp);
switch_snprintf(tmp, sizeof(buffer), "%u", node->importance);
switch_xml_set_attr_d(x_fifo, "importance", tmp);
cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose); cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose);
cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose); cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose);
} }
#define FIFO_API_SYNTAX "list|count [<fifo name>]" #define FIFO_API_SYNTAX "list|list_verbose|count|importance [<fifo name>]"
SWITCH_STANDARD_API(fifo_api_function) SWITCH_STANDARD_API(fifo_api_function)
{ {
int len = 0; int len = 0;
...@@ -761,7 +999,7 @@ SWITCH_STANDARD_API(fifo_api_function) ...@@ -761,7 +999,7 @@ SWITCH_STANDARD_API(fifo_api_function)
char *xml_text = NULL; char *xml_text = NULL;
switch_xml_t x_report = switch_xml_new("fifo_report"); switch_xml_t x_report = switch_xml_new("fifo_report");
switch_assert(x_report); switch_assert(x_report);
if (argc < 2) { if (argc < 2) {
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
...@@ -783,16 +1021,26 @@ SWITCH_STANDARD_API(fifo_api_function) ...@@ -783,16 +1021,26 @@ SWITCH_STANDARD_API(fifo_api_function)
switch_xml_free(x_report); switch_xml_free(x_report);
switch_safe_free(xml_text); switch_safe_free(xml_text);
} else if (!strcasecmp(argv[0], "importance")) {
if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
int importance = 0;
if (argc > 2) {
importance = atoi(argv[2]);
if (importance < 0) {
importance = 0;
}
node->importance = importance;
}
stream->write_function(stream, "importance: %u\n", node->importance);
} else {
stream->write_function(stream, "no fifo by that name\n");
}
} else if (!strcasecmp(argv[0], "count")) { } else if (!strcasecmp(argv[0], "count")) {
if (argc < 2) { if (argc < 2) {
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
int i = 0;
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val; node = (fifo_node_t *) val;
len = 0; len = node_consumer_wait_count(node);
for (i = 0; i < MAX_PRI; i++) {
len += switch_queue_size(node->fifo_list[i]);
}
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
...@@ -804,12 +1052,7 @@ SWITCH_STANDARD_API(fifo_api_function) ...@@ -804,12 +1052,7 @@ SWITCH_STANDARD_API(fifo_api_function)
} }
} else { } else {
if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
int i = 0; len = node_consumer_wait_count(node);
len = 0;
for (i = 0 ;i < MAX_PRI; i++) {
len += switch_queue_size(node->fifo_list[i]);
}
} }
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len);
......
...@@ -54,6 +54,7 @@ struct local_stream_context { ...@@ -54,6 +54,7 @@ struct local_stream_context {
const char *file; const char *file;
const char *func; const char *func;
int line; int line;
switch_file_handle_t *handle;
struct local_stream_context *next; struct local_stream_context *next;
}; };
...@@ -203,6 +204,9 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void ...@@ -203,6 +204,9 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
switch_mutex_lock(source->mutex); switch_mutex_lock(source->mutex);
for (cp = source->context_list; cp; cp = cp->next) { for (cp = source->context_list; cp; cp = cp->next) {
if (switch_test_flag(cp->handle, SWITCH_FILE_CALLBACK)) {
continue;
}
switch_mutex_lock(cp->audio_mutex); switch_mutex_lock(cp->audio_mutex);
if (switch_buffer_inuse(cp->audio_buffer) > source->samples * 768) { if (switch_buffer_inuse(cp->audio_buffer) > source->samples * 768) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Leaking stream handle! [%s() %s:%d]\n", cp->func, cp->file, cp->line); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Leaking stream handle! [%s() %s:%d]\n", cp->func, cp->file, cp->line);
...@@ -291,7 +295,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons ...@@ -291,7 +295,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
context->file = handle->file; context->file = handle->file;
context->func = handle->func; context->func = handle->func;
context->line = handle->line; context->line = handle->line;
context->handle = handle;
switch_mutex_lock(source->mutex); switch_mutex_lock(source->mutex);
context->next = source->context_list; context->next = source->context_list;
source->context_list = context; source->context_list = context;
......
...@@ -616,6 +616,12 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_collect_digits_callback(switch_core_s ...@@ -616,6 +616,12 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_collect_digits_callback(switch_core_s
if (!SWITCH_READ_ACCEPTABLE(status)) { if (!SWITCH_READ_ACCEPTABLE(status)) {
break; break;
} }
if (args && (args->read_frame_callback)) {
if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) {
break;
}
}
} }
return status; return status;
......
...@@ -1111,7 +1111,13 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess ...@@ -1111,7 +1111,13 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
} }
if (args && (args->read_frame_callback)) { if (args && (args->read_frame_callback)) {
int ok = 1;
switch_set_flag(fh, SWITCH_FILE_CALLBACK);
if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) { if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) {
ok = 0;
}
switch_clear_flag(fh, SWITCH_FILE_CALLBACK);
if (!ok) {
break; break;
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论