提交 5fe69d30 authored 作者: Andrew Thompson's avatar Andrew Thompson

Initial support to spawn a process (module/function) outbound on a specified…

Initial support to spawn a process (module/function) outbound on a specified node. Also fix some bugs.


git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11477 d0543943-73ff-0310-b7d9-9358b9ac24b2
上级 e14c57dd
...@@ -123,29 +123,29 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta ...@@ -123,29 +123,29 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta
/* function to spawn a process on a remote node */ /* function to spawn a process on a remote node */
int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv) int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv)
{ {
ei_x_buff buf; ei_x_buff buf;
ei_x_new_with_version(&buf); ei_x_new_with_version(&buf);
erlang_ref ref;
int i; int i;
ei_x_encode_tuple_header(&buf, 3); ei_x_encode_tuple_header(&buf, 3);
ei_x_encode_atom(&buf, "$gen_call"); ei_x_encode_atom(&buf, "$gen_call");
ei_x_encode_tuple_header(&buf, 2); ei_x_encode_tuple_header(&buf, 2);
ei_x_encode_pid(&buf, ei_self(ec)); ei_x_encode_pid(&buf, ei_self(ec));
/* TODO - use this reference to determine the response */ ei_init_ref(ec, ref);
ei_init_ref(ec, &ref); ei_x_encode_ref(&buf, ref);
ei_x_encode_ref(&buf, &ref);
ei_x_encode_tuple_header(&buf, 5); ei_x_encode_tuple_header(&buf, 5);
ei_x_encode_atom(&buf, "spawn"); ei_x_encode_atom(&buf, "spawn");
ei_x_encode_atom(&buf, module); ei_x_encode_atom(&buf, module);
ei_x_encode_atom(&buf, function); ei_x_encode_atom(&buf, function);
/* argument list */ /* argument list */
ei_x_encode_list_header(&buf, argc); if (argc < 0) {
for(i = 0; i < argc && argv[i]; i++) { ei_x_encode_list_header(&buf, argc);
ei_x_encode_atom(&buf, argv[i]); for(i = 0; i < argc && argv[i]; i++) {
ei_x_encode_atom(&buf, argv[i]);
}
} }
ei_x_encode_empty_list(&buf); ei_x_encode_empty_list(&buf);
...@@ -156,12 +156,11 @@ int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, in ...@@ -156,12 +156,11 @@ int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, in
ei_x_encode_pid(&buf, ei_self(ec)); /* should really be a valid group leader */ ei_x_encode_pid(&buf, ei_self(ec)); /* should really be a valid group leader */
char *pbuf = 0; #ifdef EI_DEBUG
i = 1; ei_x_print_reg_msg(&buf, "net_kernel", 1);
ei_s_print_term(&pbuf, buf.buff, &i); #endif
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "spawn returning %s\n", pbuf);
return ei_reg_send(ec, sockfd, "net_kernel", buf.buff, buf.index); return ei_reg_send(ec, sockfd, "net_kernel", buf.buff, buf.index);
} }
...@@ -196,6 +195,69 @@ void ei_init_ref(ei_cnode *ec, erlang_ref *ref) ...@@ -196,6 +195,69 @@ void ei_init_ref(ei_cnode *ec, erlang_ref *ref)
} }
void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send)
{
char *mbuf = NULL;
int i = 1;
ei_s_print_term(&mbuf, buf->buff, &i);
if (send) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending %s to %s\n", mbuf, dest);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received %s from %s\n", mbuf, dest);
}
free(mbuf);
}
void ei_x_print_msg(ei_x_buff *buf, erlang_pid *pid, int send)
{
char *pbuf = NULL;
int i = 0;
ei_x_buff pidbuf;
ei_x_new(&pidbuf);
ei_x_encode_pid(&pidbuf, pid);
ei_s_print_term(&pbuf, pidbuf.buff, &i);
ei_x_print_reg_msg(buf, pbuf, send);
free(pbuf);
}
int ei_sendto(ei_cnode *ec, int fd, struct erlang_process *process, ei_x_buff *buf)
{
int ret;
if (process->type == ERLANG_PID) {
ret = ei_send(fd, &process->pid, buf->buff, buf->index);
#ifdef EI_DEBUG
ei_x_print_msg(buf, &process->pid, 1);
#endif
} else if (process->type == ERLANG_REG_PROCESS) {
ret = ei_reg_send(ec, fd, process->reg_name, buf->buff, buf->index);
#ifdef EI_DEBUG
ei_x_print_reg_msg(buf, process->reg_name, 1);
#endif
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid process type!\n");
/* wuh-oh */
ret = -1;
}
return ret;
}
/* convert an erlang reference to some kind of hashed string so we can store it as a hash key */
void ei_hash_ref(erlang_ref *ref, char *output)
{
/* very lazy */
sprintf(output, "%d.%d.%d@%s", ref->n[0], ref->n[1], ref->n[2], ref->node);
}
switch_status_t initialise_ei(struct ei_cnode_s *ec) switch_status_t initialise_ei(struct ei_cnode_s *ec)
{ {
switch_status_t rv; switch_status_t rv;
......
...@@ -107,6 +107,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) ...@@ -107,6 +107,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
switch_mutex_lock(acs->listener->sock_mutex); switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(acs->listener->sock_mutex); switch_mutex_unlock(acs->listener->sock_mutex);
#ifdef EI_DEBUG
ei_x_print_msg(&ebuf, &acs->pid, 1);
#endif
ei_x_free(&ebuf); ei_x_free(&ebuf);
} }
...@@ -132,6 +135,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) ...@@ -132,6 +135,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
switch_mutex_lock(acs->listener->sock_mutex); switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
switch_mutex_unlock(acs->listener->sock_mutex); switch_mutex_unlock(acs->listener->sock_mutex);
#ifdef EI_DEBUG
ei_x_print_msg(&rbuf, &acs->pid, 1);
#endif
ei_x_free(&rbuf); ei_x_free(&rbuf);
} }
...@@ -492,8 +498,6 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei ...@@ -492,8 +498,6 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei
binding->process.pid = msg->from; binding->process.pid = msg->from;
binding->listener = listener; binding->listener = listener;
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
switch_mutex_lock(globals.listener_mutex); switch_mutex_lock(globals.listener_mutex);
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
...@@ -532,7 +536,7 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei ...@@ -532,7 +536,7 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei
switch_core_session_t *session; switch_core_session_t *session;
if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) { if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) {
/* create a new session list element and attach it to this listener */ /* create a new session list element and attach it to this listener */
if (attach_call_to_listener(listener,reg_name,session)) { if (attach_call_to_registered_process(listener, reg_name, session)) {
ei_x_encode_atom(rbuf, "ok"); ei_x_encode_atom(rbuf, "ok");
} else { } else {
ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_tuple_header(rbuf, 2);
...@@ -660,9 +664,48 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e ...@@ -660,9 +664,48 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e
return ret; return ret;
} }
static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
erlang_ref ref;
erlang_pid *pid2, *pid = switch_core_alloc(listener->pool, sizeof(erlang_pid));
char hash[100];
int arity;
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
if (ei_decode_ref(buf->buff, &buf->index, &ref)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid reference\n");
return SWITCH_STATUS_FALSE;
}
if (ei_decode_pid(buf->buff, &buf->index, pid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n");
return SWITCH_STATUS_FALSE;
}
ei_hash_ref(&ref, hash);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
if ((pid2 = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) {
if (pid2 == NULL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found unfilled slot for %s\n", hash);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash);
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No slot for %s\n", hash);
switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
}
return SWITCH_STATUS_SUCCESS;
}
int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{ {
int type, size, version; int type, type2, size, version, arity, tmpindex;
switch_status_t ret = SWITCH_STATUS_SUCCESS; switch_status_t ret = SWITCH_STATUS_SUCCESS;
buf->index = 0; buf->index = 0;
...@@ -672,7 +715,27 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff ...@@ -672,7 +715,27 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
switch(type) { switch(type) {
case ERL_SMALL_TUPLE_EXT : case ERL_SMALL_TUPLE_EXT :
case ERL_LARGE_TUPLE_EXT : case ERL_LARGE_TUPLE_EXT :
ret = handle_msg_tuple(listener,msg,buf,rbuf); tmpindex = buf->index;
ei_decode_tuple_header(buf->buff, &tmpindex, &arity);
ei_get_type(buf->buff, &tmpindex, &type2, &size);
switch(type2) {
case ERL_ATOM_EXT:
ret = handle_msg_tuple(listener,msg,buf,rbuf);
break;
case ERL_REFERENCE_EXT :
case ERL_NEW_REFERENCE_EXT :
handle_ref_tuple(listener, msg, buf, rbuf);
return 0;
default :
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type);
/* some other kind of erlang term */
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "undef");
break;
}
break; break;
case ERL_ATOM_EXT : case ERL_ATOM_EXT :
...@@ -687,17 +750,23 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff ...@@ -687,17 +750,23 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
break; break;
} }
if (SWITCH_STATUS_FALSE==ret) if (SWITCH_STATUS_FALSE==ret) {
return 0; return 0;
else { } else if (rbuf->index > 1) {
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
#ifdef EI_DEBUG
ei_x_print_msg(rbuf, &msg->from, 1);
#endif
if (SWITCH_STATUS_SUCCESS==ret) if (SWITCH_STATUS_SUCCESS==ret)
return 0; return 0;
else /* SWITCH_STATUS_TERM */ else /* SWITCH_STATUS_TERM */
return 1; return 1;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n");
return 0;
} }
} }
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
* *
*/ */
#define EI_DEBUG
typedef enum { typedef enum {
LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */ LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */
...@@ -93,6 +94,7 @@ struct listener { ...@@ -93,6 +94,7 @@ struct listener {
uint8_t event_list[SWITCH_EVENT_ALL + 1]; uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash; switch_hash_t *event_hash;
switch_hash_t *fetch_reply_hash; switch_hash_t *fetch_reply_hash;
switch_hash_t *spawn_pid_hash;
switch_thread_rwlock_t *rwlock; switch_thread_rwlock_t *rwlock;
switch_mutex_t *session_mutex; switch_mutex_t *session_mutex;
session_elem_t *session_list; session_elem_t *session_list;
...@@ -186,13 +188,18 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff ...@@ -186,13 +188,18 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to); void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event); void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag); void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv); int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv);
void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref); void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref);
void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send);
void ei_x_print_msg(ei_x_buff *buf, erlang_pid *pid, int send);
int ei_sendto(ei_cnode *ec, int fd, struct erlang_process *process, ei_x_buff *buf);
void ei_hash_ref(erlang_ref *ref, char *output);
switch_status_t initialise_ei(struct ei_cnode_s *ec); switch_status_t initialise_ei(struct ei_cnode_s *ec);
#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") #define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")
/* mod_erlang_event.c */ /* mod_erlang_event.c */
session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session); session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session);
session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session);
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论