提交 d2e9e46e authored 作者: Tamas Cseke's avatar Tamas Cseke

move session destroy to one place and hangup in others to eliminate races and…

move session destroy to one place and hangup in others to eliminate races and minimize session hash wrlocks FS-3432
上级 83f230cc
...@@ -55,6 +55,8 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename); ...@@ -55,6 +55,8 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename);
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
static void launch_listener_thread(listener_t *listener); static void launch_listener_thread(listener_t *listener);
session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid);
static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{ {
listener_t *l; listener_t *l;
...@@ -135,14 +137,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t ...@@ -135,14 +137,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
return; return;
} }
if ((s = (session_elem_t*)find_session_elem_by_uuid(listener, uuid))) {
switch_thread_rwlock_rdlock(listener->session_rwlock);
if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
switch_thread_rwlock_rdlock(s->rwlock);
}
switch_thread_rwlock_unlock(listener->session_rwlock);
if (s) {
int send = 0; int send = 0;
switch_thread_rwlock_rdlock(s->event_rwlock); switch_thread_rwlock_rdlock(s->event_rwlock);
...@@ -296,7 +291,7 @@ static void remove_listener(listener_t *listener) ...@@ -296,7 +291,7 @@ static void remove_listener(listener_t *listener)
} }
/* Search for a listener already talking to the specified node and lock for reading*/ /* Search for a listener already talking to the specified node and lock for reading*/
static listener_t *find_listener_locked(char *nodename) static listener_t *find_listener(char *nodename)
{ {
listener_t *l = NULL; listener_t *l = NULL;
...@@ -321,14 +316,9 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s ...@@ -321,14 +316,9 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s
static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element) static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
{
switch_core_hash_delete(listener->sessions, session_element->uuid_str);
}
static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
{ {
switch_thread_rwlock_wrlock(listener->session_rwlock); switch_thread_rwlock_wrlock(listener->session_rwlock);
remove_session_elem_from_listener(listener, session_element); switch_core_hash_delete(listener->sessions, session_element->uuid_str);
switch_thread_rwlock_unlock(listener->session_rwlock); switch_thread_rwlock_unlock(listener->session_rwlock);
} }
...@@ -343,10 +333,6 @@ static void destroy_session_elem(session_elem_t *session_element) ...@@ -343,10 +333,6 @@ static void destroy_session_elem(session_elem_t *session_element)
if ((session = switch_core_session_locate(session_element->uuid_str))) { if ((session = switch_core_session_locate(session_element->uuid_str))) {
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
if (switch_channel_get_state(channel) < CS_HANGUP) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Outbound session for %s exited unexpectedly!\n", session_element->uuid_str);
}
switch_channel_set_private(channel, "_erlang_session_", NULL); switch_channel_set_private(channel, "_erlang_session_", NULL);
switch_channel_clear_flag(channel, CF_CONTROLLED); switch_channel_clear_flag(channel, CF_CONTROLLED);
switch_core_session_rwunlock(session); switch_core_session_rwunlock(session);
...@@ -354,6 +340,19 @@ static void destroy_session_elem(session_elem_t *session_element) ...@@ -354,6 +340,19 @@ static void destroy_session_elem(session_elem_t *session_element)
switch_core_destroy_memory_pool(&session_element->pool); switch_core_destroy_memory_pool(&session_element->pool);
} }
session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid)
{
session_elem_t *session = NULL;
switch_thread_rwlock_rdlock(listener->session_rwlock);
if ((session = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
switch_thread_rwlock_rdlock(session->rwlock);
}
switch_thread_rwlock_unlock(listener->session_rwlock);
return session;
}
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
{ {
...@@ -362,14 +361,17 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) ...@@ -362,14 +361,17 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
void *val = NULL; void *val = NULL;
session_elem_t *session = NULL; session_elem_t *session = NULL;
switch_thread_rwlock_rdlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &val); switch_hash_this(iter, &key, NULL, &val);
if (((session_elem_t*)val)->process.type == ERLANG_PID && !ei_compare_pids(pid, &((session_elem_t*)val)->process.pid)) { if (((session_elem_t*)val)->process.type == ERLANG_PID && !ei_compare_pids(pid, &((session_elem_t*)val)->process.pid)) {
session = (session_elem_t*)val; session = (session_elem_t*)val;
switch_thread_rwlock_rdlock(session->rwlock);
break; break;
} }
} }
switch_thread_rwlock_unlock(listener->session_rwlock);
return session; return session;
} }
...@@ -536,6 +538,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t * ...@@ -536,6 +538,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
if (!(session = switch_core_session_locate(session_element->uuid_str))) { if (!(session = switch_core_session_locate(session_element->uuid_str))) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Can't locate session %s\n", session_element->uuid_str); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Can't locate session %s\n", session_element->uuid_str);
switch_event_destroy(&call_event);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
...@@ -672,19 +675,16 @@ static switch_status_t check_attached_sessions(listener_t *listener) ...@@ -672,19 +675,16 @@ static switch_status_t check_attached_sessions(listener_t *listener)
} }
} }
switch_thread_rwlock_unlock(listener->session_rwlock); switch_thread_rwlock_unlock(listener->session_rwlock);
/* release the read lock and get a write lock */
switch_thread_rwlock_wrlock(listener->session_rwlock);
/* do the deferred remove */
/* do the deferred remove */
for (header = event->headers; header; header = header->next) { for (header = event->headers; header; header = header->next) {
if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) { if ((sp = (session_elem_t*)find_session_elem_by_uuid(listener, header->value))) {
remove_session_elem_from_listener(listener, sp); remove_session_elem_from_listener(listener, sp);
switch_thread_rwlock_unlock(sp->rwlock);
destroy_session_elem(sp); destroy_session_elem(sp);
} }
} }
switch_thread_rwlock_unlock(listener->session_rwlock);
/* remove the temporary event */ /* remove the temporary event */
switch_event_destroy(&event); switch_event_destroy(&event);
...@@ -783,13 +783,23 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) ...@@ -783,13 +783,23 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */ remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */
/* TODO - eliminate session destroy races and we shouldn't lock the session hash */
switch_thread_rwlock_wrlock(listener->session_rwlock);
if ((s = find_session_elem_by_pid(listener, pid))) { if ((s = find_session_elem_by_pid(listener, pid))) {
remove_session_elem_from_listener(listener, s); switch_core_session_t *session = NULL;
destroy_session_elem(s);
if ((session = switch_core_session_locate(s->uuid_str))) {
switch_channel_t *channel = switch_core_session_get_channel(session);
if (switch_channel_get_state(channel) < CS_HANGUP) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Outbound session exited unexpectedly %s!\n", s->uuid_str);
}
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_core_session_rwunlock(session);
}
switch_thread_rwlock_unlock(s->rwlock);
} }
switch_thread_rwlock_wrlock(listener->session_rwlock);
if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) { if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) {
...@@ -1214,7 +1224,6 @@ static listener_t *new_outbound_listener_locked(char *node) ...@@ -1214,7 +1224,6 @@ static listener_t *new_outbound_listener_locked(char *node)
return NULL; return NULL;
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
listener = new_listener(&ec, clientfd); listener = new_listener(&ec, clientfd);
listener->peer_nodename = switch_core_strdup(listener->pool, node); listener->peer_nodename = switch_core_strdup(listener->pool, node);
} }
...@@ -1410,10 +1419,11 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul ...@@ -1410,10 +1419,11 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000); switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000);
if (!p->pid) { if (!p->pid) {
switch_channel_t *channel = switch_core_session_get_channel(session);
p->state = reply_timeout; p->state = reply_timeout;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str);
remove_session_elem_from_listener_locked(listener, session_element); switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
destroy_session_elem(session_element);
return NULL; return NULL;
} }
...@@ -1500,7 +1510,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function) ...@@ -1500,7 +1510,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
/* first work out if there is a listener already talking to the node we want to talk to */ /* first work out if there is a listener already talking to the node we want to talk to */
listener = find_listener_locked(node); listener = find_listener(node);
/* if there is no listener, then create one */ /* if there is no listener, then create one */
if (!listener) { if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
...@@ -1567,7 +1577,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) ...@@ -1567,7 +1577,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
ei_x_encode_atom(&buf, "freeswitch_sendmsg"); ei_x_encode_atom(&buf, "freeswitch_sendmsg");
_ei_x_encode_string(&buf, argv[2]); _ei_x_encode_string(&buf, argv[2]);
listener = find_listener_locked(node); listener = find_listener(node);
if (!listener) { if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
listener = new_outbound_listener_locked(node); listener = new_outbound_listener_locked(node);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论