提交 a4080cc4 authored 作者: dingding's avatar dingding

FS-9583 [mod_fifo] fix node_thread_run blocking outbound_ringall_thread_run and…

FS-9583 [mod_fifo] fix node_thread_run blocking outbound_ringall_thread_run and outbound_enterprise_thread_run
上级 4002bda2
...@@ -2011,9 +2011,10 @@ static int place_call_enterprise_callback(void *pArg, int argc, char **argv, cha ...@@ -2011,9 +2011,10 @@ static int place_call_enterprise_callback(void *pArg, int argc, char **argv, cha
* effect. outbound_per_cycle and outbound_per_cycle_min both default to 1. * effect. outbound_per_cycle and outbound_per_cycle_min both default to 1.
* *
*/ */
static void find_consumers(fifo_node_t *node) static int find_consumers(fifo_node_t *node)
{ {
char *sql; char *sql;
int ret = 0;
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, " sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
...@@ -2027,6 +2028,7 @@ static void find_consumers(fifo_node_t *node) ...@@ -2027,6 +2028,7 @@ static void find_consumers(fifo_node_t *node)
case NODE_STRATEGY_ENTERPRISE: case NODE_STRATEGY_ENTERPRISE:
{ {
int need = node_caller_count(node); int need = node_caller_count(node);
int count;
if (node->outbound_per_cycle && node->outbound_per_cycle < need) { if (node->outbound_per_cycle && node->outbound_per_cycle < need) {
need = node->outbound_per_cycle; need = node->outbound_per_cycle;
...@@ -2034,7 +2036,9 @@ static void find_consumers(fifo_node_t *node) ...@@ -2034,7 +2036,9 @@ static void find_consumers(fifo_node_t *node)
need = node->outbound_per_cycle_min; need = node->outbound_per_cycle_min;
} }
count = need;
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need); fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
ret = count - need;
} }
break; break;
case NODE_STRATEGY_RINGALL: case NODE_STRATEGY_RINGALL:
...@@ -2056,6 +2060,7 @@ static void find_consumers(fifo_node_t *node) ...@@ -2056,6 +2060,7 @@ static void find_consumers(fifo_node_t *node)
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh); fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
if (cbh->rowcount) { if (cbh->rowcount) {
ret = cbh->rowcount;
switch_threadattr_create(&thd_attr, cbh->pool); switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
...@@ -2070,6 +2075,7 @@ static void find_consumers(fifo_node_t *node) ...@@ -2070,6 +2075,7 @@ static void find_consumers(fifo_node_t *node)
} }
switch_safe_free(sql); switch_safe_free(sql);
return ret;
} }
/*\brief Continuously attempt to deliver calls to outbound members /*\brief Continuously attempt to deliver calls to outbound members
...@@ -2094,7 +2100,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o ...@@ -2094,7 +2100,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
globals.node_thread_running = 1; globals.node_thread_running = 1;
while (globals.node_thread_running == 1) { while (globals.node_thread_running == 1) {
int ppl_waiting, consumer_total, idle_consumers, found = 0; int ppl_waiting, consumer_total, idle_consumers, need_sleep = 0;
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
...@@ -2159,9 +2165,9 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o ...@@ -2159,9 +2165,9 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
} }
if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
found++; if (find_consumers(this_node)) {
find_consumers(this_node); need_sleep++;
switch_yield(1000000); }
} }
} }
} }
...@@ -2172,8 +2178,9 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o ...@@ -2172,8 +2178,9 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
if (cur_priority == 1) { if (cur_priority == 1 || need_sleep) {
switch_yield(1000000); switch_yield(1000000);
need_sleep = 0;
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论