提交 b6c7dd36 authored 作者: Andrew Thompson's avatar Andrew Thompson

Merge in Rob Charlton's patch for outbound session support in mod_erlang_event


git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11376 d0543943-73ff-0310-b7d9-9358b9ac24b2
上级 6bc24b8c
BASE=../../../.. BASE=../../../..
LOCAL_SOURCES=handle_msg.c ei_helpers.c
LOCAL_OBJS=handle_msg.o ei_helpers.o
include $(BASE)/build/modmake.rules include $(BASE)/build/modmake.rules
LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.8/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.8/lib/ -D_REENTRANT
LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.9/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.9/lib/ -D_REENTRANT
LOCAL_LDFLAGS=-lei LOCAL_LDFLAGS=-lei
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005/2006, Anthony Minessale II <anthmct@yahoo.com>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthmct@yahoo.com>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Anthony Minessale II <anthmct@yahoo.com>
* Andrew Thompson <andrew@hijacked.us>
* Rob Charlton <rob.charlton@savageminds.com>
*
*
* ei_helpers.c -- helper functions for ei
*
*/
#include <switch.h>
#include <ei.h>
#include "mod_erlang_event.h"
/* Stolen from code added to ei in R12B-5.
* Since not everyone has this version yet;
* provide our own version.
* */
#define put8(s,n) do { \
(s)[0] = (char)((n) & 0xff); \
(s) += 1; \
} while (0)
#define put32be(s,n) do { \
(s)[0] = ((n) >> 24) & 0xff; \
(s)[1] = ((n) >> 16) & 0xff; \
(s)[2] = ((n) >> 8) & 0xff; \
(s)[3] = (n) & 0xff; \
(s) += 4; \
} while (0)
void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
char msgbuf[2048];
char *s;
int index = 0;
/*int n;*/
index = 5; /* max sizes: */
ei_encode_version(msgbuf,&index); /* 1 */
ei_encode_tuple_header(msgbuf,&index,3);
ei_encode_long(msgbuf,&index,ERL_LINK);
ei_encode_pid(msgbuf,&index,from); /* 268 */
ei_encode_pid(msgbuf,&index,to); /* 268 */
/* 5 byte header missing */
s = msgbuf;
put32be(s, index - 4); /* 4 */
put8(s, ERL_PASS_THROUGH); /* 1 */
/* sum: 542 */
switch_mutex_lock(listener->sock_mutex);
write(listener->sockfd, msgbuf, index);
switch_mutex_unlock(listener->sock_mutex);
}
void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event)
{
int i;
char *uuid = switch_event_get_header(event, "unique-id");
switch_event_header_t *hp;
for (i = 0, hp = event->headers; hp; hp = hp->next, i++);
if (event->body)
i++;
ei_x_encode_list_header(ebuf, i+1);
if (uuid) {
ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id"));
} else {
ei_x_encode_atom(ebuf, "undefined");
}
for (hp = event->headers; hp; hp = hp->next) {
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_string(ebuf, hp->name);
ei_x_encode_string(ebuf, hp->value);
}
if (event->body) {
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_string(ebuf, "body");
ei_x_encode_string(ebuf, event->body);
}
ei_x_encode_empty_list(ebuf);
}
void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag)
{
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_atom(ebuf, tag);
ei_encode_switch_event_headers(ebuf, event);
}
switch_status_t initialise_ei(struct ei_cnode_s *ec)
{
switch_status_t rv;
struct sockaddr_in server_addr;
/* zero out the struct before we use it */
memset(&server_addr, 0, sizeof(server_addr));
/* convert the configured IP to network byte order, handing errors */
rv = inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr);
if (rv == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip);
return SWITCH_STATUS_FALSE;
} else if (rv == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno));
return SWITCH_STATUS_FALSE;
}
/* set the address family and port */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(prefs.port);
struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET);
char *thishostname = nodehost->h_name;
char thisnodename[MAXNODELEN+1];
if (!strcmp(thishostname, "localhost"))
gethostname(thishostname, EI_MAXHOSTNAMELEN);
if (prefs.shortname) {
char *off;
if ((off = strchr(thishostname, '.'))) {
*off = '\0';
}
}
snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname);
/* init the ei stuff */
if (ei_connect_xinit(ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
return SWITCH_STATUS_FALSE;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ei initialized at %s\n", thisnodename);
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4:
*/
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005/2006, Anthony Minessale II <anthmct@yahoo.com>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthmct@yahoo.com>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Anthony Minessale II <anthmct@yahoo.com>
* Andrew Thompson <andrew@hijacked.us>
* Rob Charlton <rob.charlton@savageminds.com>
*
*
* handle_msg.c -- handle messages received from erlang nodes
*
*/
#include <switch.h>
#include <ei.h>
#include "mod_erlang_event.h"
static char *MARKER = "1";
static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
{
switch_bool_t r = SWITCH_TRUE;
struct api_command_struct *acs = (struct api_command_struct *) obj;
switch_stream_handle_t stream = { 0 };
char *reply, *freply = NULL;
switch_status_t status;
if (!acs) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
return NULL;
}
if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
goto done;
}
SWITCH_STANDARD_STREAM(stream);
if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
reply = stream.data;
} else {
freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
reply = freply;
r = SWITCH_FALSE;
}
if (!reply) {
reply = "Command returned no output!";
r = SWITCH_FALSE;
}
if (*reply == '-')
r = SWITCH_FALSE;
if (acs->bg) {
switch_event_t *event;
if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
ei_x_buff ebuf;
ei_x_new_with_version(&ebuf);
if (acs->arg) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
}
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false");
switch_event_add_body(event, "%s", reply);
switch_event_fire(&event);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
ei_x_encode_tuple_header(&ebuf, 3);
if (r)
ei_x_encode_atom(&ebuf, "bgok");
else
ei_x_encode_atom(&ebuf, "bgerror");
ei_x_encode_string(&ebuf, acs->uuid_str);
ei_x_encode_string(&ebuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&ebuf);
}
} else {
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
ei_x_encode_tuple_header(&rbuf, 2);
if (!strlen(reply)) {
reply = "Command returned no output!";
r = SWITCH_FALSE;
}
if (r) {
ei_x_encode_atom(&rbuf, "ok");
} else {
ei_x_encode_atom(&rbuf, "error");
}
ei_x_encode_string(&rbuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&rbuf);
}
switch_safe_free(stream.data);
switch_safe_free(freply);
if (acs->listener->rwlock) {
switch_thread_rwlock_unlock(acs->listener->rwlock);
}
done:
if (acs->bg) {
switch_memory_pool_t *pool = acs->pool;
acs = NULL;
switch_core_destroy_memory_pool(&pool);
pool = NULL;
}
return NULL;
}
static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *buf, ei_x_buff *rbuf)
{
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
if (ei_decode_string(buf->buff, &buf->index, uuid_str)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf));
nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz);
memcpy(nbuf->buff, buf->buff, buf->buffsz);
nbuf->index = buf->index;
nbuf->buffsz = buf->buffsz;
switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf);
ei_x_encode_atom(rbuf, "ok");
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_set_log_level(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
switch_log_level_t ltype = SWITCH_LOG_DEBUG;
char loglevelstr[MAXATOMLEN];
if (arity != 2 ||
ei_decode_atom(buf->buff, &buf->index, loglevelstr)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
ltype = switch_log_str2level(loglevelstr);
if (ltype && ltype != SWITCH_LOG_INVALID) {
listener->level = ltype;
ei_x_encode_atom(rbuf, "ok");
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char atom[MAXATOMLEN];
if (arity == 1) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
int custom = 0;
switch_event_types_t type;
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
}
for (int i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
if (custom) {
switch_core_hash_insert(listener->event_hash, atom, MARKER);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
if (type == SWITCH_EVENT_ALL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
uint32_t x = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
}
if (type <= SWITCH_EVENT_ALL) {
listener->event_list[type] = 1;
}
if (type == SWITCH_EVENT_CUSTOM) {
custom++;
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
}
}
ei_x_encode_atom(rbuf, "ok");
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char atom[MAXATOMLEN];
if (arity == 1) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
int custom = 0;
switch_event_types_t type;
for (int i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
if (custom) {
switch_core_hash_delete(listener->event_hash, atom);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
uint32_t x = 0;
if (type == SWITCH_EVENT_CUSTOM) {
custom++;
} else if (type == SWITCH_EVENT_ALL) {
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 0;
}
} else {
if (listener->event_list[SWITCH_EVENT_ALL]) {
listener->event_list[SWITCH_EVENT_ALL] = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
}
listener->event_list[type] = 0;
}
}
}
}
ei_x_encode_atom(rbuf, "ok");
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_api(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char api_cmd[MAXATOMLEN];
char arg[1024];
if (arity < 3 ||
ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
ei_decode_string(buf->buff, &buf->index, arg)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
return SWITCH_STATUS_SUCCESS;
}
else {
struct api_command_struct acs = { 0 };
acs.listener = listener;
acs.api_cmd = api_cmd;
acs.arg = arg;
acs.bg = 0;
acs.pid = msg->from;
api_exec(NULL, (void *) &acs);
/* don't reply */
return SWITCH_STATUS_FALSE;
}
}
static switch_status_t handle_msg_bgapi(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char api_cmd[MAXATOMLEN];
char arg[1024];
if (arity < 3 ||
ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
ei_decode_string(buf->buff, &buf->index, arg)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
struct api_command_struct *acs = NULL;
switch_memory_pool_t *pool;
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_uuid_t uuid;
switch_core_new_memory_pool(&pool);
acs = switch_core_alloc(pool, sizeof(*acs));
switch_assert(acs);
acs->pool = pool;
acs->listener = listener;
acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
acs->arg = switch_core_strdup(acs->pool, arg);
acs->bg = 1;
acs->pid = msg->from;
switch_threadattr_create(&thd_attr, acs->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_uuid_get(&uuid);
switch_uuid_format(acs->uuid_str, &uuid);
switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_string(rbuf, acs->uuid_str);
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char ename[MAXATOMLEN];
int headerlength;
if (ei_decode_atom(buf->buff, &buf->index, ename) ||
ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
switch_event_types_t etype;
if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) {
switch_event_t *event;
if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) {
char key[1024];
char value[1024];
int i = 0;
switch_bool_t fail = SWITCH_FALSE;
while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
if (ei_decode_string(buf->buff, &buf->index, key) ||
ei_decode_string(buf->buff, &buf->index, value)) {
fail = SWITCH_TRUE;
break;
}
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
}
if (headerlength != i || fail) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
switch_event_fire(&event);
ei_x_encode_atom(rbuf, "ok");
}
}
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char uuid[37];
int headerlength;
if (ei_decode_string(buf->buff, &buf->index, uuid) ||
ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
switch_core_session_t *session;
if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) {
switch_event_t *event;
if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
char key[1024];
char value[1024];
int i = 0;
switch_bool_t fail = SWITCH_FALSE;
while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
if (ei_decode_string(buf->buff, &buf->index, key) ||
ei_decode_string(buf->buff, &buf->index, value)) {
fail = SWITCH_TRUE;
break;
}
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
}
if (headerlength != i || fail) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) {
ei_x_encode_atom(rbuf, "ok");
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
}
}
}
/* release the lock returned by switch_core_locate_session */
switch_core_session_rwunlock(session);
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "nosession");
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
/* format is (result|config|directory|dialplan|phrases) */
char sectionstr[MAXATOMLEN];
switch_xml_section_t section;
if (ei_decode_atom(buf->buff, &buf->index, sectionstr) ||
!(section = switch_xml_parse_section_string(sectionstr))) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
struct erlang_binding *binding, *ptr;
if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
}
else {
binding->section = section;
binding->pid = msg->from;
binding->listener = listener;
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
switch_mutex_lock(globals.listener_mutex);
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
if (ptr) {
ptr->next = binding;
} else {
bindings.head = binding;
}
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
switch_mutex_unlock(globals.listener_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
ei_link(listener, ei_self(listener->ec), &msg->from);
ei_x_encode_atom(rbuf, "ok");
}
}
return SWITCH_STATUS_SUCCESS;
}
/* {handlecall,<uuid>,<handler process registered name>} */
static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char reg_name[MAXATOMLEN];
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
if (arity != 3 ||
ei_decode_string(buf->buff, &buf->index, uuid_str) ||
ei_decode_string(buf->buff, &buf->index, reg_name)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
switch_core_session_t *session;
if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) {
/* create a new sesion list element and attach it to this listener */
if (attach_call_to_listener(listener,reg_name,session)) {
ei_x_encode_atom(rbuf, "ok");
}
else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badsession");
}
}
else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
char tupletag[MAXATOMLEN];
int arity;
switch_status_t ret = SWITCH_STATUS_SUCCESS;
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else {
if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) {
ret = handle_msg_fetch_reply(listener,buf,rbuf);
} else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) {
ret = handle_msg_set_log_level(listener,arity,buf,rbuf);
} else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
ret = handle_msg_event(listener,arity,buf,rbuf);
} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
ret = handle_msg_nixevent(listener,arity,buf,rbuf);
} else if (!strncmp(tupletag, "api", MAXATOMLEN)) {
ret = handle_msg_api(listener,msg,arity,buf,rbuf);
} else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
ret = handle_msg_bgapi(listener,msg,arity,buf,rbuf);
} else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) {
ret = handle_msg_sendevent(listener,arity,buf,rbuf);
} else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) {
ret = handle_msg_sendmsg(listener,arity,buf,rbuf);
} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
ret = handle_msg_bind(listener,msg,buf,rbuf);
} else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
ret = handle_msg_handlecall(listener,arity,buf,rbuf);
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
}
}
return ret;
}
static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
char atom[MAXATOMLEN];
switch_status_t ret = SWITCH_STATUS_SUCCESS;
if (ei_decode_atom(buf->buff, &buf->index, atom)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
}
else if (!strncmp(atom, "nolog", MAXATOMLEN)) {
if (switch_test_flag(listener, LFLAG_LOG)) {
switch_clear_flag_locked(listener, LFLAG_LOG);
}
ei_x_encode_atom(rbuf, "ok");
} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->log_pid = msg->from;
listener->level = SWITCH_LOG_DEBUG;
switch_set_flag(listener, LFLAG_LOG);
ei_x_encode_atom(rbuf, "ok");
} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->event_pid = msg->from;
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
}
ei_x_encode_atom(rbuf, "ok");
} else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
void *pop;
/*purge the event queue */
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
if (switch_test_flag(listener, LFLAG_EVENTS)) {
uint8_t x = 0;
switch_clear_flag_locked(listener, LFLAG_EVENTS);
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 0;
}
/* wipe the hash */
switch_core_hash_destroy(&listener->event_hash);
switch_core_hash_init(&listener->event_hash, listener->pool);
ei_x_encode_atom(rbuf, "ok");
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "notlistening");
}
} else if (!strncmp(atom, "exit", MAXATOMLEN)) {
ei_x_encode_atom(rbuf, "ok");
ret = SWITCH_STATUS_TERM;
} else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_pid(rbuf, ei_self(listener->ec));
} else if (!strncmp(atom, "link", MAXATOMLEN)) {
/* debugging */
ei_link(listener, ei_self(listener->ec), &msg->from);
ret = SWITCH_STATUS_FALSE;
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
}
return ret;
}
int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
int type, size, version;
switch_status_t ret = SWITCH_STATUS_SUCCESS;
buf->index = 0;
ei_decode_version(buf->buff, &buf->index, &version);
ei_get_type(buf->buff, &buf->index, &type, &size);
switch(type) {
case ERL_SMALL_TUPLE_EXT :
case ERL_LARGE_TUPLE_EXT :
ret = handle_msg_tuple(listener,msg,buf,rbuf);
break;
case ERL_ATOM_EXT :
ret = handle_msg_atom(listener,msg,buf,rbuf);
break;
default :
/* some other kind of erlang term */
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
break;
}
if (SWITCH_STATUS_FALSE==ret)
return 0;
else {
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex);
if (SWITCH_STATUS_SUCCESS==ret)
return 0;
else /* SWITCH_STATUS_TERM */
return 1;
}
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4:
*/
...@@ -25,101 +25,22 @@ ...@@ -25,101 +25,22 @@
* *
* Anthony Minessale II <anthmct@yahoo.com> * Anthony Minessale II <anthmct@yahoo.com>
* Andrew Thompson <andrew@hijacked.us> * Andrew Thompson <andrew@hijacked.us>
* Rob Charlton <rob.charlton@savageminds.com>
* *
* *
* mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
* *
*/ */
#include <switch.h> #include <switch.h>
#include <ei.h> #include <ei.h>
#define DEFINE_GLOBALS
#include "mod_erlang_event.h"
SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load); SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime); SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime);
SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime); SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime);
static char *MARKER = "1";
typedef enum {
LFLAG_AUTHED = (1 << 0),
LFLAG_RUNNING = (1 << 1),
LFLAG_EVENTS = (1 << 2),
LFLAG_LOG = (1 << 3),
LFLAG_FULL = (1 << 4),
LFLAG_MYEVENTS = (1 << 5),
LFLAG_SESSION = (1 << 6),
LFLAG_ASYNC = (1 << 7),
LFLAG_STATEFUL = (1 << 8)
} event_flag_t;
struct listener {
int sockfd;
struct ei_cnode_s *ec;
erlang_pid log_pid;
erlang_pid event_pid;
switch_queue_t *event_queue;
switch_queue_t *log_queue;
switch_memory_pool_t *pool;
switch_mutex_t *flag_mutex;
switch_mutex_t *sock_mutex;
char *ebuf;
uint32_t flags;
switch_log_level_t level;
uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash;
switch_hash_t *fetch_reply_hash;
switch_thread_rwlock_t *rwlock;
switch_core_session_t *session;
int lost_events;
int lost_logs;
time_t last_flush;
uint32_t timeout;
uint32_t id;
char remote_ip[50];
/*switch_port_t remote_port;*/
struct listener *next;
};
typedef struct listener listener_t;
static struct {
int sockfd;
switch_mutex_t *sock_mutex;
listener_t *listeners;
uint8_t ready;
} listen_list;
#define MAX_ACL 100
struct erlang_binding {
switch_xml_section_t section;
erlang_pid pid;
char *registered_process; /* TODO */
listener_t *listener;
struct erlang_binding *next;
};
static struct {
struct erlang_binding *head;
switch_xml_binding_t *search_binding;
} bindings;
static struct {
switch_mutex_t *mutex;
char *ip;
char *nodename;
switch_bool_t shortname;
uint16_t port;
char *cookie;
int done;
int threads;
char *acl[MAX_ACL];
uint32_t acl_count;
uint32_t id;
} prefs;
static void remove_listener(listener_t *listener); static void remove_listener(listener_t *listener);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
...@@ -129,12 +50,6 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename); ...@@ -129,12 +50,6 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename);
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
static void launch_listener_thread(listener_t *listener); static void launch_listener_thread(listener_t *listener);
static struct {
switch_mutex_t *listener_mutex;
switch_event_node_t *node;
} globals;
static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{ {
listener_t *l; listener_t *l;
...@@ -173,49 +88,6 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l ...@@ -173,49 +88,6 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
} }
/* Stolen from code added to ei in R12B-5.
* Since not everyone has this verison yet;
* provide our own version.
* */
#define put8(s,n) do { \
(s)[0] = (char)((n) & 0xff); \
(s) += 1; \
} while (0)
#define put32be(s,n) do { \
(s)[0] = ((n) >> 24) & 0xff; \
(s)[1] = ((n) >> 16) & 0xff; \
(s)[2] = ((n) >> 8) & 0xff; \
(s)[3] = (n) & 0xff; \
(s) += 4; \
} while (0)
static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
char msgbuf[2048];
char *s;
int index = 0;
/*int n;*/
index = 5; /* max sizes: */
ei_encode_version(msgbuf,&index); /* 1 */
ei_encode_tuple_header(msgbuf,&index,3);
ei_encode_long(msgbuf,&index,ERL_LINK);
ei_encode_pid(msgbuf,&index,from); /* 268 */
ei_encode_pid(msgbuf,&index,to); /* 268 */
/* 5 byte header missing */
s = msgbuf;
put32be(s, index - 4); /* 4 */
put8(s, ERL_PASS_THROUGH); /* 1 */
/* sum: 542 */
switch_mutex_lock(listener->sock_mutex);
write(listener->sockfd, msgbuf, index);
switch_mutex_unlock(listener->sock_mutex);
}
static void expire_listener(listener_t **listener) static void expire_listener(listener_t **listener)
{ {
void *pop; void *pop;
...@@ -268,54 +140,33 @@ static void remove_binding(listener_t *listener, erlang_pid *pid) { ...@@ -268,54 +140,33 @@ static void remove_binding(listener_t *listener, erlang_pid *pid) {
} }
static void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event) static void send_event_to_attached_sessions(listener_t* listener, switch_event_t *event)
{ {
int i;
char *uuid = switch_event_get_header(event, "unique-id"); char *uuid = switch_event_get_header(event, "unique-id");
switch_event_t *clone = NULL;
session_elem_t* s;
switch_event_header_t *hp; if (!uuid)
return;
for (i = 0, hp = event->headers; hp; hp = hp->next, i++); switch_mutex_lock(listener->session_mutex);
for (s = listener->session_list; s; s = s->next) {
if (event->body) /* check the event uuid against the uuid of each session */
i++; if (!strcmp(uuid, switch_core_session_get_uuid(s->session))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending event to attached session\n");
ei_x_encode_list_header(ebuf, i+1); if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
/* add the event to the queue for this session */
if (uuid) { if (switch_queue_trypush(s->event_queue, clone) != SWITCH_STATUS_SUCCESS) {
ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id")); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Lost event!\n");
switch_event_destroy(&clone);
}
} else { } else {
ei_x_encode_atom(ebuf, "undefined"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
} }
for (hp = event->headers; hp; hp = hp->next) {
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_string(ebuf, hp->name);
ei_x_encode_string(ebuf, hp->value);
} }
if (event->body) {
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_string(ebuf, "body");
ei_x_encode_string(ebuf, event->body);
} }
switch_mutex_unlock(listener->session_mutex);
ei_x_encode_empty_list(ebuf);
}
static void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag)
{
ei_x_encode_tuple_header(ebuf, 2);
ei_x_encode_atom(ebuf, tag);
ei_encode_switch_event_headers(ebuf, event);
} }
#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")
static void event_handler(switch_event_t *event) static void event_handler(switch_event_t *event)
{ {
switch_event_t *clone = NULL; switch_event_t *clone = NULL;
...@@ -336,6 +187,11 @@ static void event_handler(switch_event_t *event) ...@@ -336,6 +187,11 @@ static void event_handler(switch_event_t *event)
l = lp; l = lp;
lp = lp->next; lp = lp->next;
/* test all of the sessions attached to this event in case
one of them should receive it as well
*/
send_event_to_attached_sessions(l,event);
if (!switch_test_flag(l, LFLAG_EVENTS)) { if (!switch_test_flag(l, LFLAG_EVENTS)) {
continue; continue;
} }
...@@ -355,13 +211,6 @@ static void event_handler(switch_event_t *event) ...@@ -355,13 +211,6 @@ static void event_handler(switch_event_t *event)
} }
} }
if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
char *uuid = switch_event_get_header(event, "unique-id");
if (!uuid || strcmp(uuid, switch_core_session_get_uuid(l->session))) {
send = 0;
}
}
if (send) { if (send) {
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
if (switch_queue_trypush(l->event_queue, clone) == SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(l->event_queue, clone) == SWITCH_STATUS_SUCCESS) {
...@@ -429,134 +278,27 @@ static void remove_listener(listener_t *listener) ...@@ -429,134 +278,27 @@ static void remove_listener(listener_t *listener)
switch_mutex_unlock(globals.listener_mutex); switch_mutex_unlock(globals.listener_mutex);
} }
/* Search for a listener already talking to the specified node */
struct api_command_struct { static listener_t * find_listener(char* nodename)
char *api_cmd;
char *arg;
listener_t *listener;
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
uint8_t bg;
erlang_pid pid;
switch_memory_pool_t *pool;
};
static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
{ {
switch_bool_t r = SWITCH_TRUE; listener_t *l = NULL;
struct api_command_struct *acs = (struct api_command_struct *) obj;
switch_stream_handle_t stream = { 0 };
char *reply, *freply = NULL;
switch_status_t status;
if (!acs) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
return NULL;
}
if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
goto done;
}
SWITCH_STANDARD_STREAM(stream);
if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
reply = stream.data;
} else {
freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
reply = freply;
r = SWITCH_FALSE;
}
if (!reply) {
reply = "Command returned no output!";
r = SWITCH_FALSE;
}
if (*reply == '-')
r = SWITCH_FALSE;
if (acs->bg) {
switch_event_t *event;
if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
ei_x_buff ebuf;
ei_x_new_with_version(&ebuf);
if (acs->arg) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
}
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false"); switch_mutex_lock(globals.listener_mutex);
switch_event_add_body(event, "%s", reply); for (l = listen_list.listeners; l; l = l->next) {
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
switch_event_fire(&event); break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
ei_x_encode_tuple_header(&ebuf, 3);
if (r)
ei_x_encode_atom(&ebuf, "bgok");
else
ei_x_encode_atom(&ebuf, "bgerror");
ei_x_encode_string(&ebuf, acs->uuid_str);
ei_x_encode_string(&ebuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&ebuf);
}
} else {
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
ei_x_encode_tuple_header(&rbuf, 2);
if (!strlen(reply)) {
reply = "Command returned no output!";
r = SWITCH_FALSE;
}
if (r) {
ei_x_encode_atom(&rbuf, "ok");
} else {
ei_x_encode_atom(&rbuf, "error");
}
ei_x_encode_string(&rbuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&rbuf);
}
switch_safe_free(stream.data);
switch_safe_free(freply);
if (acs->listener->rwlock) {
switch_thread_rwlock_unlock(acs->listener->rwlock);
} }
done:
if (acs->bg) {
switch_memory_pool_t *pool = acs->pool;
acs = NULL;
switch_core_destroy_memory_pool(&pool);
pool = NULL;
} }
return NULL; switch_mutex_unlock(globals.listener_mutex);
return l;
}
static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
{
switch_mutex_lock(listener->session_mutex);
session_element->next = listener->session_list;
listener->session_list = session_element;
switch_mutex_unlock(listener->session_mutex);
} }
static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
...@@ -650,577 +392,193 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c ...@@ -650,577 +392,193 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
} }
static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, char* reg_name)
{ {
int type, size, version, arity; switch_event_t *call_event=NULL;
char tupletag[MAXATOMLEN]; switch_channel_t *channel=NULL;
char atom[MAXATOMLEN];
buf->index = 0;
ei_decode_version(buf->buff, &buf->index, &version);
ei_get_type(buf->buff, &buf->index, &type, &size);
switch(type) {
case ERL_SMALL_TUPLE_EXT :
case ERL_LARGE_TUPLE_EXT :
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) { /* Send a message to the associated registered process to let it know there is a call.
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; Message is a tuple of the form {call, <call-event>}
*/
if (ei_decode_string(buf->buff, &buf->index, uuid_str)) { channel = switch_core_session_get_channel(session);
ei_x_encode_tuple_header(rbuf, 2); if (switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA) != SWITCH_STATUS_SUCCESS) {
ei_x_encode_atom(rbuf, "error"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n");
ei_x_encode_atom(rbuf, "badarg"); return SWITCH_STATUS_MEMERR;
break;
} }
switch_caller_profile_event_set_data(switch_channel_get_caller_profile(channel), "Channel", call_event);
switch_channel_event_set_data(channel, call_event);
switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Content-Type", "command/reply");
switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Reply-Text", "+OK\n");
ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf)); ei_x_buff lbuf;
/*char *wtf = "hello world";*/ ei_x_new_with_version(&lbuf);
nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz); ei_x_encode_tuple_header(&lbuf, 2);
memcpy(nbuf->buff, buf->buff, buf->buffsz); ei_x_encode_atom(&lbuf, "call");
/*memcpy(nbuf, wtf, 20);*/ ei_encode_switch_event(&lbuf, call_event);
nbuf->index = buf->index; switch_mutex_lock(listener->sock_mutex);
nbuf->buffsz = buf->buffsz; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n");
if (ei_reg_send(listener->ec,listener->sockfd, reg_name, lbuf.buff, lbuf.index)==ERL_ERROR) {
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "stored %d %d %s\n", buf->index, buf->buffsz, nbuf);*/ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send call event\n");
}
switch_mutex_unlock(listener->sock_mutex);
switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); ei_x_free(&lbuf);
return SWITCH_STATUS_SUCCESS;
}
} else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) { static switch_status_t check_attached_sessions(listener_t *listener)
if (arity == 2) { {
switch_log_level_t ltype = SWITCH_LOG_DEBUG; session_elem_t *last,*sp;
char loglevelstr[MAXATOMLEN]; switch_status_t status = SWITCH_STATUS_SUCCESS;
if (ei_decode_atom(buf->buff, &buf->index, loglevelstr)) { void *pop;
ei_x_encode_tuple_header(rbuf, 2); /* check up on all the attached sessions -
ei_x_encode_atom(rbuf, "error"); if they have not yet sent an initial call event to the associated erlang process then do so
ei_x_encode_atom(rbuf, "badarg"); if they have pending events in their queues then send them
if the session has finished then clean it up
*/
switch_mutex_lock(listener->session_mutex);
sp = listener->session_list;
last = NULL;
while(sp) {
if (!switch_test_flag(sp, LFLAG_OUTBOUND_INIT)) {
status = notify_new_session(listener, sp->session, sp->reg_name);
if (status != SWITCH_STATUS_SUCCESS)
break; break;
switch_set_flag(sp, LFLAG_OUTBOUND_INIT);
} }
ltype = switch_log_str2level(loglevelstr); /* check event queue for this session */
if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_event_t *pevent = (switch_event_t *) pop;
if (ltype && ltype != SWITCH_LOG_INVALID) { /* events from attached sessions are wrapped in a {call_event,<EVT>} tuple
listener->level = ltype; to distinguish them from normal events (if they are sent to the same process)
} else { */
ei_x_encode_tuple_header(rbuf, 2); ei_x_buff ebuf;
ei_x_encode_atom(rbuf, "error"); ei_x_new_with_version(&ebuf);
ei_x_encode_atom(rbuf, "badarg"); ei_x_encode_tuple_header(&ebuf, 2);
break; ei_x_encode_atom(&ebuf, "call_event");
} ei_encode_switch_event(&ebuf, pevent);
} else {
/* tuple too long */
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
} else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
if (arity == 1) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
int custom = 0; switch_mutex_lock(listener->sock_mutex);
switch_event_types_t type; ei_reg_send(listener->ec, listener->sockfd, sp->reg_name, ebuf.buff, ebuf.index);
switch_mutex_unlock(listener->sock_mutex);
if (!switch_test_flag(listener, LFLAG_EVENTS)) { /* event is a hangup, so this session can be removed */
switch_set_flag_locked(listener, LFLAG_EVENTS); if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP) {
} switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hangup event for attached session\n");
for (int i = 1; i < arity; i++) { /* remove session from list */
if (!ei_decode_atom(buf->buff, &buf->index, atom)) { if (last)
last->next = sp->next;
else
listener->session_list = sp->next;
if (custom) { /* this allows the application threads to exit */
switch_core_hash_insert(listener->event_hash, atom, MARKER); switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
if (type == SWITCH_EVENT_ALL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
uint32_t x = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
}
if (type <= SWITCH_EVENT_ALL) {
listener->event_list[type] = 1;
}
if (type == SWITCH_EVENT_CUSTOM) {
custom++;
}
/* TODO
if this listener was created outbound, and the last session has been detached
should the listener also exit? Does it matter?
*/
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
}
}
} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
if (arity == 1) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
int custom = 0;
switch_event_types_t type;
for (int i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
if (custom) {
switch_core_hash_delete(listener->event_hash, atom);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
uint32_t x = 0;
if (type == SWITCH_EVENT_CUSTOM) { ei_x_free(&ebuf);
custom++; switch_event_destroy(&pevent);
} else if (type == SWITCH_EVENT_ALL) {
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 0;
}
} else {
if (listener->event_list[SWITCH_EVENT_ALL]) {
listener->event_list[SWITCH_EVENT_ALL] = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
}
listener->event_list[type] = 0;
}
}
}
} }
} else if (!strncmp(tupletag, "api", MAXATOMLEN)) { last = sp;
if (arity < 3) { sp = sp->next;
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
} }
switch_mutex_unlock(listener->session_mutex);
return status;
}
char api_cmd[MAXATOMLEN]; static void check_log_queue(listener_t *listener)
char arg[1024]; {
void *pop;
if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) { /* send out any pending crap in the log queue */
ei_x_encode_tuple_header(rbuf, 2); if (switch_test_flag(listener, LFLAG_LOG)) {
ei_x_encode_atom(rbuf, "error"); if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
ei_x_encode_atom(rbuf, "badarg"); switch_log_node_t *dnode = (switch_log_node_t *) pop;
break;
}
if (ei_decode_string(buf->buff, &buf->index, arg)) { if (dnode->data) {
ei_x_encode_tuple_header(rbuf, 2); ei_x_buff lbuf;
ei_x_encode_atom(rbuf, "error"); ei_x_new_with_version(&lbuf);
ei_x_encode_atom(rbuf, "badarg"); ei_x_encode_tuple_header(&lbuf, 2);
break; ei_x_encode_atom(&lbuf, "log");
} ei_x_encode_list_header(&lbuf, 6);
struct api_command_struct acs = { 0 };
acs.listener = listener;
acs.api_cmd = api_cmd;
acs.arg = arg;
acs.bg = 0;
acs.pid = msg->from;
api_exec(NULL, (void *) &acs);
goto noreply;
} else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
if (arity < 3) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
char api_cmd[MAXATOMLEN]; ei_x_encode_tuple_header(&lbuf, 2);
char arg[1024]; ei_x_encode_atom(&lbuf, "level");
ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) { ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(&lbuf, "text_channel");
ei_x_encode_atom(rbuf, "error"); ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
ei_x_encode_atom(rbuf, "badarg");
break;
}
if (ei_decode_string(buf->buff, &buf->index, arg)) { ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(&lbuf, "file");
ei_x_encode_atom(rbuf, "error"); ei_x_encode_string(&lbuf, dnode->file);
ei_x_encode_atom(rbuf, "badarg");
break;
}
struct api_command_struct *acs = NULL; ei_x_encode_tuple_header(&lbuf, 2);
switch_memory_pool_t *pool; ei_x_encode_atom(&lbuf, "func");
switch_thread_t *thread; ei_x_encode_string(&lbuf, dnode->func);
switch_threadattr_t *thd_attr = NULL;
switch_uuid_t uuid;
switch_core_new_memory_pool(&pool); ei_x_encode_tuple_header(&lbuf, 2);
acs = switch_core_alloc(pool, sizeof(*acs)); ei_x_encode_atom(&lbuf, "line");
switch_assert(acs); ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line);
acs->pool = pool;
acs->listener = listener;
acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
acs->arg = switch_core_strdup(acs->pool, arg);
acs->bg = 1;
acs->pid = msg->from;
switch_threadattr_create(&thd_attr, acs->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_uuid_get(&uuid); ei_x_encode_tuple_header(&lbuf, 2);
switch_uuid_format(acs->uuid_str, &uuid); ei_x_encode_atom(&lbuf, "data");
switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool); ei_x_encode_string(&lbuf, dnode->data);
ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_empty_list(&lbuf);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_string(rbuf, acs->uuid_str);
break; switch_mutex_lock(listener->sock_mutex);
} else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) { ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
char ename[MAXATOMLEN]; switch_mutex_unlock(listener->sock_mutex);
if (ei_decode_atom(buf->buff, &buf->index, ename)) { ei_x_free(&lbuf);
ei_x_encode_tuple_header(rbuf, 2); free(dnode->data);
ei_x_encode_atom(rbuf, "error"); free(dnode);
ei_x_encode_atom(rbuf, "badarg");
break;
} }
int headerlength;
if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
} }
}
}
switch_event_types_t etype; static void check_event_queue(listener_t *listener)
if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) { {
switch_event_t *event; void* pop;
/* send out any pending crap in the event queue */
if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) { if (switch_test_flag(listener, LFLAG_EVENTS)) {
if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
char key[1024];
char value[1024];
int i = 0;
while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
if (ei_decode_string(buf->buff, &buf->index, key))
goto sendevent_fail;
if (ei_decode_string(buf->buff, &buf->index, value))
goto sendevent_fail;
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value); switch_event_t *pevent = (switch_event_t *) pop;
}
if (headerlength != i) ei_x_buff ebuf;
goto sendevent_fail; ei_x_new_with_version(&ebuf);
ei_encode_switch_event(&ebuf, pevent);
switch_event_fire(&event); switch_mutex_lock(listener->sock_mutex);
ei_x_encode_atom(rbuf, "ok"); ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
break; switch_mutex_unlock(listener->sock_mutex);
sendevent_fail: ei_x_free(&ebuf);
ei_x_encode_tuple_header(rbuf, 2); switch_event_destroy(&pevent);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
} }
} }
} else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) { }
char uuid[37];
if (ei_decode_string(buf->buff, &buf->index, uuid)) { static void listener_main_loop(listener_t *listener)
ei_x_encode_tuple_header(rbuf, 2); {
ei_x_encode_atom(rbuf, "error"); int status = 1;
ei_x_encode_atom(rbuf, "badarg");
break;
}
switch_core_session_t *session; while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) {
if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) { erlang_msg msg;
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "nosession");
break;
}
int headerlength; ei_x_buff buf;
ei_x_new(&buf);
if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
switch_event_t *event;
if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
char key[1024];
char value[1024];
int i = 0;
while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
if (ei_decode_string(buf->buff, &buf->index, key))
goto sendmsg_fail;
if (ei_decode_string(buf->buff, &buf->index, value))
goto sendmsg_fail;
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
}
if (headerlength != i)
goto sendmsg_fail;
if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) {
ei_x_encode_atom(rbuf, "ok");
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
}
/* release the lock returned by switch_core_locate_session */
switch_core_session_rwunlock(session);
break;
sendmsg_fail:
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
/* format is (result|config|directory|dialplan|phrases) */
char sectionstr[MAXATOMLEN];
if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
switch_xml_section_t section;
if (!(section = switch_xml_parse_section_string(sectionstr))) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
struct erlang_binding *binding, *ptr;
if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
break;
}
binding->section = section;
binding->pid = msg->from;
binding->listener = listener;
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
switch_mutex_lock(globals.listener_mutex);
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
if (ptr) {
ptr->next = binding;
} else {
bindings.head = binding;
}
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
switch_mutex_unlock(globals.listener_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
ei_link(listener, ei_self(listener->ec), &msg->from);
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
break;
}
ei_x_encode_atom(rbuf, "ok");
break;
case ERL_ATOM_EXT :
if (ei_decode_atom(buf->buff, &buf->index, atom)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
if (!strncmp(atom, "nolog", MAXATOMLEN)) {
if (switch_test_flag(listener, LFLAG_LOG)) {
switch_clear_flag_locked(listener, LFLAG_LOG);
}
} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->log_pid = msg->from;
listener->level = SWITCH_LOG_DEBUG;
switch_set_flag(listener, LFLAG_LOG);
} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->event_pid = msg->from;
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
}
} else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
void *pop;
/*purge the event queue */
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
if (switch_test_flag(listener, LFLAG_EVENTS)) {
uint8_t x = 0;
switch_clear_flag_locked(listener, LFLAG_EVENTS);
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 0;
}
/* wipe the hash */
switch_core_hash_destroy(&listener->event_hash);
switch_core_hash_init(&listener->event_hash, listener->pool);
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "notlistening");
break;
}
} else if (!strncmp(atom, "exit", MAXATOMLEN)) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
ei_x_encode_atom(rbuf, "ok");
goto event_done;
} else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_pid(rbuf, ei_self(listener->ec));
} else if (!strncmp(atom, "link", MAXATOMLEN)) {
/* debugging */
ei_link(listener, ei_self(listener->ec), &msg->from);
goto noreply;
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
break;
}
ei_x_encode_atom(rbuf, "ok");
break;
default :
/* some other kind of erlang term */
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
break;
}
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex);
noreply:
return 0;
event_done:
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex);
return 1;
}
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
{
listener_t *listener = (listener_t *) obj;
switch_core_session_t *session = NULL;
switch_channel_t *channel = NULL;
int status = 1;
void *pop;
switch_mutex_lock(globals.listener_mutex);
prefs.threads++;
switch_mutex_unlock(globals.listener_mutex);
switch_assert(listener != NULL);
if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) {
uint32_t x = 0;
for (x = 0; x < prefs.acl_count; x++) {
if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
erlang_msg msg;
ei_x_buff buf;
ei_x_new(&buf);
status = ei_xreceive_msg(listener->sockfd, &msg, &buf);
/* get data off the socket, just so we can get the pid on the other end */
if (status == ERL_MSG) {
/* if we got a message, return an ACL error. */
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
ei_x_encode_tuple_header(&rbuf, 2);
ei_x_encode_atom(&rbuf, "error");
ei_x_encode_atom(&rbuf, "acldeny");
ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
ei_x_free(&rbuf);
}
ei_x_free(&buf);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]);
goto done;
}
}
}
if ((session = listener->session)) {
channel = switch_core_session_get_channel(session);
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
goto done;
}
}
if (switch_strlen_zero(listener->remote_ip)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n");
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/
}
switch_set_flag_locked(listener, LFLAG_RUNNING);
add_listener(listener);
while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) {
erlang_msg msg;
ei_x_buff buf;
ei_x_new(&buf);
ei_x_buff rbuf; ei_x_buff rbuf;
ei_x_new_with_version(&rbuf); ei_x_new_with_version(&rbuf);
...@@ -1237,13 +595,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ...@@ -1237,13 +595,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
case ERL_SEND : case ERL_SEND :
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/
if (handle_msg(listener, &msg, &buf, &rbuf)) { if (handle_msg(listener, &msg, &buf, &rbuf)) {
goto done; return;
} }
break; break;
case ERL_REG_SEND : case ERL_REG_SEND :
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send\n");*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send\n");*/
if (handle_msg(listener, &msg, &buf, &rbuf)) { if (handle_msg(listener, &msg, &buf, &rbuf)) {
goto done; return;
} }
break; break;
case ERL_LINK : case ERL_LINK :
...@@ -1275,77 +633,76 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ...@@ -1275,77 +633,76 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
ei_x_free(&buf); ei_x_free(&buf);
ei_x_free(&rbuf); ei_x_free(&rbuf);
/* send out any pending crap in the log queue */ check_log_queue(listener);
if (switch_test_flag(listener, LFLAG_LOG)) { check_event_queue(listener);
if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { if (SWITCH_STATUS_SUCCESS != check_attached_sessions(listener)) {
switch_log_node_t *dnode = (switch_log_node_t *) pop; return;
}
if (dnode->data) { }
ei_x_buff lbuf; }
ei_x_new_with_version(&lbuf);
ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_atom(&lbuf, "log");
ei_x_encode_list_header(&lbuf, 6);
ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_atom(&lbuf, "level");
ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
ei_x_encode_tuple_header(&lbuf, 2); static switch_bool_t check_inbound_acl(listener_t* listener)
ei_x_encode_atom(&lbuf, "text_channel"); {
ei_x_encode_char(&lbuf, (unsigned char)dnode->level); /* check acl to see if inbound connection is allowed */
if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) {
uint32_t x = 0;
for (x = 0; x < prefs.acl_count; x++) {
if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
int status = 1;
erlang_msg msg;
ei_x_encode_tuple_header(&lbuf, 2); ei_x_buff buf;
ei_x_encode_atom(&lbuf, "file"); ei_x_new(&buf);
ei_x_encode_string(&lbuf, dnode->file);
ei_x_encode_tuple_header(&lbuf, 2); status = ei_xreceive_msg(listener->sockfd, &msg, &buf);
ei_x_encode_atom(&lbuf, "func"); /* get data off the socket, just so we can get the pid on the other end */
ei_x_encode_string(&lbuf, dnode->func); if (status == ERL_MSG) {
/* if we got a message, return an ACL error. */
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
ei_x_encode_tuple_header(&lbuf, 2); ei_x_encode_tuple_header(&rbuf, 2);
ei_x_encode_atom(&lbuf, "line"); ei_x_encode_atom(&rbuf, "error");
ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line); ei_x_encode_atom(&rbuf, "acldeny");
ei_x_encode_tuple_header(&lbuf, 2); ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
ei_x_encode_atom(&lbuf, "data");
ei_x_encode_string(&lbuf, dnode->data);
ei_x_encode_empty_list(&lbuf); ei_x_free(&rbuf);
}
switch_mutex_lock(listener->sock_mutex); ei_x_free(&buf);
ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&lbuf); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]);
free(dnode->data); return SWITCH_FALSE;
free(dnode);
} }
} }
} }
return SWITCH_TRUE;
}
/* ditto with the event queue */ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
if (switch_test_flag(listener, LFLAG_EVENTS)) { {
if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { listener_t *listener = (listener_t *) obj;
session_elem_t* s;
switch_event_t *pevent = (switch_event_t *) pop;
ei_x_buff ebuf;
ei_x_new_with_version(&ebuf);
ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(globals.listener_mutex);
prefs.threads++;
switch_mutex_unlock(globals.listener_mutex);
switch_mutex_lock(listener->sock_mutex); switch_assert(listener != NULL);
ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&ebuf); if (check_inbound_acl(listener)) {
switch_event_destroy(&pevent); if (switch_strlen_zero(listener->remote_ip)) {
} switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n");
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/
} }
add_listener(listener);
listener_main_loop(listener);
} }
done: /* clean up */
remove_listener(listener); remove_listener(listener);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
...@@ -1364,16 +721,22 @@ done: ...@@ -1364,16 +721,22 @@ done:
/* remove any bindings for this connection */ /* remove any bindings for this connection */
remove_binding(listener, NULL); remove_binding(listener, NULL);
if (listener->session) { /* clean up all the attached sessions */
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); switch_mutex_lock(listener->session_mutex);
switch_clear_flag_locked(listener, LFLAG_SESSION); for (s = listener->session_list; s; s = s->next) {
switch_core_session_rwunlock(listener->session); switch_channel_clear_flag(switch_core_session_get_channel(s->session), CF_CONTROLLED);
} else if (listener->pool) { /* this allows the application threads to exit */
switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE);
/* */
switch_core_session_rwunlock(s->session);
}
switch_mutex_unlock(listener->session_mutex);
if (listener->pool) {
switch_memory_pool_t *pool = listener->pool; switch_memory_pool_t *pool = listener->pool;
switch_core_destroy_memory_pool(&pool); switch_core_destroy_memory_pool(&pool);
} }
switch_mutex_lock(globals.listener_mutex); switch_mutex_lock(globals.listener_mutex);
prefs.threads--; prefs.threads--;
switch_mutex_unlock(globals.listener_mutex); switch_mutex_unlock(globals.listener_mutex);
...@@ -1454,12 +817,148 @@ static int config(void) ...@@ -1454,12 +817,148 @@ static int config(void)
return 0; return 0;
} }
static listener_t* new_listener(struct ei_cnode_s *ec, int clientfd)
{
switch_memory_pool_t *listener_pool = NULL;
listener_t* listener = NULL;
if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
return NULL;
}
if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
return NULL;
}
switch_thread_rwlock_create(&listener->rwlock, listener_pool);
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
listener->ec = ec;
listener->sockfd = clientfd;
listener->pool = listener_pool;
listener_pool = NULL;
listener->level = SWITCH_LOG_DEBUG;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
return listener;
}
static listener_t* new_outbound_listener(char* node)
{
listener_t* listener = NULL;
struct ei_cnode_s ec;
int clientfd;
if (SWITCH_STATUS_SUCCESS==initialise_ei(&ec)) {
errno = 0;
if ((clientfd=ei_connect(&ec,node)) == ERL_ERROR) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n",node,erl_errno,errno);
return NULL;
}
listener = new_listener(&ec,clientfd);
listener->peer_nodename = switch_core_strdup(listener->pool,node);
}
return listener;
}
session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session)
{
/* create a session list element */
session_elem_t* session_element=NULL;
if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
}
else {
if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
}
else {
session_element->session = session;
session_element->reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name);
switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
/* attach the session to the listener */
add_session_elem_to_listener(listener,session_element);
}
}
return session_element;
}
/* Module Hooks */ /* Module Hooks */
/* Entry point for outbound mode */
SWITCH_STANDARD_APP(erlang_outbound_function)
{
char *reg_name, *node;
listener_t *listener;
int argc = 0;
char *argv[80] = { 0 };
char *mydata;
switch_bool_t new_session = SWITCH_FALSE;
session_elem_t* session_element=NULL;
/* process app arguments */
if (data && (mydata = switch_core_session_strdup(session, data))) {
argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
}
if (argc < 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse Error - need registered name and node!\n");
return;
}
reg_name = argv[0];
if (switch_strlen_zero(reg_name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing registered name!\n");
return;
}
node = argv[1];
if (switch_strlen_zero(node)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing node name!\n");
return;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n",reg_name, node);
/* first work out if there is a listener already talking to the node we want to talk to */
listener = find_listener(node);
/* if there is no listener, then create one */
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new listener for session\n");
new_session = SWITCH_TRUE;
listener = new_outbound_listener(node);
}
else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Using existing listener for session\n");
}
if (listener &&
(session_element=attach_call_to_listener(listener,reg_name,session)) != NULL) {
if (new_session)
launch_listener_thread(listener);
switch_ivr_park(session, NULL);
/* keep app thread running for lifetime of session */
if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) {
while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) {
switch_yield(100000);
}
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "exit erlang_outbound_function\n");
}
SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
{ {
switch_application_interface_t *app_interface;
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
...@@ -1483,6 +982,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) ...@@ -1483,6 +982,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
/* connect my internal structure to the blank pointer passed to me */ /* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname); *module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "erlang", "Connect to an erlang node", "Connect to erlang", erlang_outbound_function, "<registered name> <node@host>", SAF_SUPPORT_NOMEDIA);
/* indicate that the module should continue to be loaded */ /* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
...@@ -1557,25 +1058,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) ...@@ -1557,25 +1058,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
switch_yield(100000); switch_yield(100000);
} }
struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET); if (SWITCH_STATUS_SUCCESS!=initialise_ei(&ec)) {
char *thishostname = nodehost->h_name;
char thisnodename[MAXNODELEN+1];
if (!strcmp(thishostname, "localhost"))
gethostname(thishostname, EI_MAXHOSTNAMELEN);
if (prefs.shortname) {
char *off;
if ((off = strchr(thishostname, '.'))) {
*off = '\0';
}
}
snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname);
/* init the ei stuff */
if (ei_connect_xinit(&ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
close_socket(&listen_list.sockfd); close_socket(&listen_list.sockfd);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
...@@ -1596,7 +1079,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) ...@@ -1596,7 +1079,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
} }
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode at %s port %u\n", thisnodename, prefs.port); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode\n");
listen_list.ready = 1; listen_list.ready = 1;
...@@ -1621,59 +1104,42 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) ...@@ -1621,59 +1104,42 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
break; break;
} }
if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { listener = new_listener(&ec,clientfd);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); if (listener) {
goto fail; /* store the IP and node name we are talking with */
}
if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
break;
}
switch_thread_rwlock_create(&listener->rwlock, listener_pool);
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));
listener->peer_nodename = switch_core_strdup(listener->pool,conn.nodename);
listener->ec = &ec; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename, listener->remote_ip);
listener->sockfd = clientfd;
listener->pool = listener_pool;
listener_pool = NULL;
listener->level = SWITCH_LOG_DEBUG;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
launch_listener_thread(listener); launch_listener_thread(listener);
} }
else
/* if we fail to create a listener (memory error), then the module will exit */
break;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n");
/* cleanup epmd registration */ /* cleanup epmd registration */
ei_unpublish(&ec); ei_unpublish(&ec);
close(epmdfd); close(epmdfd);
close_socket(&listen_list.sockfd); close_socket(&listen_list.sockfd);
if (pool) { if (pool) {
switch_core_destroy_memory_pool(&pool); switch_core_destroy_memory_pool(&pool);
} }
if (listener_pool) { if (listener_pool) {
switch_core_destroy_memory_pool(&listener_pool); switch_core_destroy_memory_pool(&listener_pool);
} }
for (x = 0; x < prefs.acl_count; x++) { for (x = 0; x < prefs.acl_count; x++) {
switch_safe_free(prefs.acl[x]); switch_safe_free(prefs.acl[x]);
} }
fail:
prefs.done = 2; prefs.done = 2;
return SWITCH_STATUS_TERM; return SWITCH_STATUS_TERM;
} }
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
{ {
listener_t *l; listener_t *l;
......
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005/2006, Anthony Minessale II <anthmct@yahoo.com>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthmct@yahoo.com>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Anthony Minessale II <anthmct@yahoo.com>
* Andrew Thompson <andrew@hijacked.us>
* Rob Charlton <rob.charlton@savageminds.com>
*
*
* mod_erlang_event.h -- Erlang Event Handler derived from mod_event_socket
*
*/
typedef enum {
LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */
LFLAG_SESSION_ALIVE
} session_flag_t;
struct session_elem {
switch_core_session_t *session;
switch_mutex_t *flag_mutex;
uint32_t flags;
/* registered process name that will receive call notifications from this session */
char* reg_name;
switch_queue_t *event_queue;
struct session_elem *next;
};
typedef struct session_elem session_elem_t;
typedef enum {
LFLAG_RUNNING = (1 << 0),
LFLAG_EVENTS = (1 << 1),
LFLAG_LOG = (1 << 2),
LFLAG_MYEVENTS = (1 << 3),
LFLAG_STATEFUL = (1 << 4)
} event_flag_t;
/* There is one listener for each Erlang node we are attached to - either
inbound or outbound. For example, if the erlang node node1@server connects
to freeswitch then a listener is created and handles commands sent from
that node. If 5 calls are directed to the outbound erlang application
via the dialplan, and are also set to talk to node1@server, then those
5 call sessions will be "attached" to the same listener.
*/
struct listener {
int sockfd;
struct ei_cnode_s *ec;
erlang_pid log_pid;
erlang_pid event_pid;
char *peer_nodename;
switch_queue_t *event_queue;
switch_queue_t *log_queue;
switch_memory_pool_t *pool;
switch_mutex_t *flag_mutex;
switch_mutex_t *sock_mutex;
char *ebuf;
uint32_t flags;
switch_log_level_t level;
uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash;
switch_hash_t *fetch_reply_hash;
switch_thread_rwlock_t *rwlock;
switch_mutex_t *session_mutex;
session_elem_t *session_list;
int lost_events;
int lost_logs;
time_t last_flush;
uint32_t timeout;
uint32_t id;
char remote_ip[50];
/*switch_port_t remote_port;*/
struct listener *next;
};
typedef struct listener listener_t;
struct erlang_binding {
switch_xml_section_t section;
erlang_pid pid;
char *registered_process; /* TODO */
listener_t *listener;
struct erlang_binding *next;
};
struct api_command_struct {
char *api_cmd;
char *arg;
listener_t *listener;
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
uint8_t bg;
erlang_pid pid;
switch_memory_pool_t *pool;
};
struct globals_struct {
switch_mutex_t *listener_mutex;
switch_event_node_t *node;
};
typedef struct globals_struct globals_t;
struct listen_list_struct {
int sockfd;
switch_mutex_t *sock_mutex;
listener_t *listeners;
uint8_t ready;
};
typedef struct listen_list_struct listen_list_t;
struct bindings_struct {
struct erlang_binding *head;
switch_xml_binding_t *search_binding;
};
typedef struct bindings_struct bindings_t;
#define MAX_ACL 100
struct prefs_struct {
switch_mutex_t *mutex;
char *ip;
char *nodename;
switch_bool_t shortname;
uint16_t port;
char *cookie;
int done;
int threads;
char *acl[MAX_ACL];
uint32_t acl_count;
uint32_t id;
};
typedef struct prefs_struct prefs_t;
/* shared globals */
#ifdef DEFINE_GLOBALS
globals_t globals;
listen_list_t listen_list;
bindings_t bindings;
prefs_t prefs;
#else
extern globals_t globals;
extern listen_list_t listen_list;
extern bindings_t bindings;
extern prefs_t prefs;
#endif
/* function prototypes */
/* handle_msg.c */
int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf);
/* ei_helpers.c */
void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
switch_status_t initialise_ei(struct ei_cnode_s *ec);
#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")
/* mod_erlang_event.c */
session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session);
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4:
*/
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论