提交 0f0f2781 authored 作者: Chris Rienzo's avatar Chris Rienzo

mod_rayo: reworked internal messaging- all messages sent by JID only

上级 911948c8
......@@ -42,6 +42,14 @@
#define RAYO_CALL_NS RAYO_BASE "call:" RAYO_VERSION
#define RAYO_MIXER_NS RAYO_BASE "mixer:" RAYO_VERSION
#define RAT_CALL "CALL"
#define RAT_COMPONENT "COMPONENT"
#define RAT_CALL_COMPONENT RAT_COMPONENT"_CALL"
#define RAT_MIXER "MIXER"
#define RAT_MIXER_COMPONENT RAT_COMPONENT"_MIXER"
#define RAT_SERVER "SERVER"
#define RAT_PEER_SERVER "PEER_SERVER"
#define RAT_CLIENT "CLIENT"
/* these are support punchblock.. undefine once punchblock is fixed */
#define RAYO_UUID_IN_REF_URI
......@@ -56,30 +64,21 @@ struct rayo_component;
*/
struct rayo_message {
iks *payload;
char *from_jid;
char *from_type;
char *from_subtype;
int is_reply;
};
typedef void (* rayo_actor_cleanup_fn)(struct rayo_actor *);
typedef void (* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_actor *, struct rayo_message *, const char *file, int line);
/**
* Type of actor
*/
enum rayo_actor_type {
RAT_PEER_SERVER,
RAT_CLIENT,
RAT_SERVER,
RAT_CALL,
RAT_MIXER,
RAT_CALL_COMPONENT,
RAT_MIXER_COMPONENT
};
typedef void (* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_message *, const char *file, int line);
/**
* A rayo actor - this is an entity that can be controlled by a rayo client
*/
struct rayo_actor {
/** Type of actor */
enum rayo_actor_type type;
char *type;
/** Sub-type of actor */
char *subtype;
/** domain part of JID */
......@@ -102,6 +101,8 @@ struct rayo_actor {
rayo_actor_send_fn send_fn;
/** optional cleanup */
rayo_actor_cleanup_fn cleanup_fn;
/** incoming message queue */
switch_queue_t *msg_queue;
};
/**
......@@ -125,16 +126,19 @@ struct rayo_component {
#define RAYO_CALL(x) ((struct rayo_call *)x)
#define RAYO_MIXER(x) ((struct rayo_mixer *)x)
extern struct rayo_message *rayo_message_create(iks *xml);
extern struct rayo_message *rayo_message_create_dup(iks *xml);
extern struct rayo_message *rayo_message_create(struct rayo_actor *from, iks *xml, int dup, int reply);
extern void rayo_message_destroy(struct rayo_message *msg);
extern iks *rayo_message_remove_payload(struct rayo_message *msg);
#define RAYO_MESSAGE_CREATE(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 0, 0)
#define RAYO_MESSAGE_CREATE_DUP(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 1, 0)
#define RAYO_REPLY_CREATE(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 0, 1)
#define RAYO_REPLY_CREATE_DUP(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 1, 1)
extern struct rayo_actor *rayo_actor_locate(const char *jid, const char *file, int line);
extern struct rayo_actor *rayo_actor_locate_by_id(const char *id, const char *file, int line);
extern int rayo_actor_seq_next(struct rayo_actor *actor);
extern void rayo_actor_send(struct rayo_actor *from, struct rayo_actor *to, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_send_by_jid(struct rayo_actor *from, const char *jid, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_send(const char *jid, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_rdlock(struct rayo_actor *actor, const char *file, int line);
extern void rayo_actor_unlock(struct rayo_actor *actor, const char *file, int line);
extern void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int line);
......@@ -150,19 +154,18 @@ extern void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int l
#define RAYO_UNLOCK(x) rayo_actor_unlock(RAYO_ACTOR(x), __FILE__, __LINE__)
#define RAYO_DESTROY(x) rayo_actor_destroy(RAYO_ACTOR(x), __FILE__, __LINE__)
#define RAYO_SEQ_NEXT(x) rayo_actor_seq_next(RAYO_ACTOR(x))
#define RAYO_SEND(from, to, msg) rayo_actor_send(RAYO_ACTOR(from), RAYO_ACTOR(to), msg, __FILE__, __LINE__)
#define RAYO_SEND_BY_JID(from, jid, msg) rayo_actor_send_by_jid(RAYO_ACTOR(from), jid, msg, __FILE__, __LINE__)
#define RAYO_SEND(to, msg) rayo_actor_send(to, msg, __FILE__, __LINE__)
extern const char *rayo_call_get_dcp_jid(struct rayo_call *call);
#define rayo_mixer_get_name(mixer) RAYO_ID(mixer)
#define rayo_component_init(component, pool, type, id, parent, client_jid) _rayo_component_init(component, pool, type, id, parent, client_jid, __FILE__, __LINE__)
extern struct rayo_component *_rayo_component_init(struct rayo_component *component, switch_memory_pool_t *pool, const char *type, const char *id, struct rayo_actor *parent, const char *client_jid, const char *file, int line);
#define rayo_component_init(component, pool, type, subtype, id, parent, client_jid) _rayo_component_init(component, pool, type, subtype, id, parent, client_jid, __FILE__, __LINE__)
extern struct rayo_component *_rayo_component_init(struct rayo_component *component, switch_memory_pool_t *pool, const char *type, const char *subtype, const char *id, struct rayo_actor *parent, const char *client_jid, const char *file, int line);
typedef iks *(*rayo_actor_xmpp_handler)(struct rayo_actor *, struct rayo_actor *, iks *, void *);
extern void rayo_actor_command_handler_add(enum rayo_actor_type type, const char *subtype, const char *name, rayo_actor_xmpp_handler fn);
extern void rayo_actor_event_handler_add(enum rayo_actor_type from_type, const char *from_subtype, enum rayo_actor_type to_type, const char *to_subtype, const char *name, rayo_actor_xmpp_handler fn);
typedef iks *(*rayo_actor_xmpp_handler)(struct rayo_actor *, struct rayo_message *, void *);
extern void rayo_actor_command_handler_add(const char *type, const char *subtype, const char *name, rayo_actor_xmpp_handler fn);
extern void rayo_actor_event_handler_add(const char *from_type, const char *from_subtype, const char *to_type, const char *to_subtype, const char *name, rayo_actor_xmpp_handler fn);
#endif
......
......@@ -40,7 +40,7 @@
struct rayo_component *rayo_component_locate(const char *id, const char *file, int line)
{
struct rayo_actor *actor = rayo_actor_locate_by_id(id, file, line);
if (actor && (actor->type == RAT_MIXER_COMPONENT || actor->type == RAT_CALL_COMPONENT)) {
if (actor && !strncmp(RAT_COMPONENT, actor->type, strlen(RAT_COMPONENT))) {
return RAYO_COMPONENT(actor);
} else if (actor) {
RAYO_UNLOCK(actor);
......@@ -63,7 +63,7 @@ void rayo_component_send_start(struct rayo_component *component, iks *iq)
#else
iks_insert_attrib_printf(ref, "uri", "xmpp:%s", RAYO_JID(component));
#endif
RAYO_SEND_BY_JID(component, iks_find_attrib(response, "to"), rayo_message_create(response));
RAYO_SEND(iks_find_attrib(response, "to"), RAYO_REPLY_CREATE(component, response));
}
/**
......@@ -116,7 +116,7 @@ iks *rayo_component_create_complete_event(struct rayo_component *component, cons
*/
void rayo_component_send_complete_event(struct rayo_component *component, iks *response)
{
RAYO_SEND_BY_JID(component, iks_find_attrib(response, "to"), rayo_message_create(response));
RAYO_SEND(iks_find_attrib(response, "to"), RAYO_REPLY_CREATE(component, response));
RAYO_UNLOCK(component);
RAYO_DESTROY(component);
}
......
......@@ -187,7 +187,7 @@ static void send_barge_event(struct rayo_component *component)
iks_insert_attrib(event, "to", component->client_jid);
x = iks_insert(event, "start-of-input");
iks_insert_attrib(x, "xmlns", RAYO_INPUT_NS);
RAYO_SEND_BY_JID(component, component->client_jid, rayo_message_create(event));
RAYO_SEND(component->client_jid, RAYO_REPLY_CREATE(component, event));
}
/**
......@@ -466,8 +466,9 @@ static iks *start_call_input(struct input_component *component, switch_core_sess
/**
* Start execution of input component
*/
static iks *start_call_input_component(struct rayo_actor *client, struct rayo_actor *call, iks *iq, void *session_data)
static iks *start_call_input_component(struct rayo_actor *call, struct rayo_message *msg, void *session_data)
{
iks *iq = msg->payload;
switch_core_session_t *session = (switch_core_session_t *)session_data;
char *component_id = switch_mprintf("%s-input", switch_core_session_get_uuid(session));
switch_memory_pool_t *pool = NULL;
......@@ -482,7 +483,7 @@ static iks *start_call_input_component(struct rayo_actor *client, struct rayo_ac
/* create component */
switch_core_new_memory_pool(&pool);
input_component = switch_core_alloc(pool, sizeof(*input_component));
rayo_component_init(RAYO_COMPONENT(input_component), pool, "input", component_id, call, iks_find_attrib(iq, "from"));
rayo_component_init(RAYO_COMPONENT(input_component), pool, RAT_CALL_COMPONENT, "input", component_id, call, iks_find_attrib(iq, "from"));
switch_safe_free(component_id);
/* start input */
......@@ -492,8 +493,9 @@ static iks *start_call_input_component(struct rayo_actor *client, struct rayo_ac
/**
* Stop execution of input component
*/
static iks *stop_call_input_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *stop_call_input_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct input_component *input_component = INPUT_COMPONENT(component);
if (input_component && !input_component->stop) {
......@@ -518,8 +520,9 @@ static iks *stop_call_input_component(struct rayo_actor *client, struct rayo_act
/**
* Start input component timers
*/
static iks *start_timers_call_input_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *start_timers_call_input_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct input_component *input_component = INPUT_COMPONENT(component);
if (input_component) {
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
......
......@@ -57,14 +57,14 @@ struct output_component {
/**
* Create new output component
*/
static struct rayo_component *create_output_component(struct rayo_actor *actor, iks *output, const char *client_jid)
static struct rayo_component *create_output_component(struct rayo_actor *actor, const char *type, iks *output, const char *client_jid)
{
switch_memory_pool_t *pool;
struct output_component *output_component = NULL;
switch_core_new_memory_pool(&pool);
output_component = switch_core_alloc(pool, sizeof(*output_component));
rayo_component_init((struct rayo_component *)output_component, pool, "output", NULL, actor, client_jid);
rayo_component_init((struct rayo_component *)output_component, pool, type, "output", NULL, actor, client_jid);
output_component->document = iks_copy(output);
output_component->repeat_interval = iks_find_int_attrib(output, "repeat-interval");
......@@ -120,8 +120,9 @@ static iks *start_call_output(struct rayo_component *component, switch_core_sess
/**
* Start execution of call output component
*/
static iks *start_call_output_component(struct rayo_actor *client, struct rayo_actor *call, iks *iq, void *session_data)
static iks *start_call_output_component(struct rayo_actor *call, struct rayo_message *msg, void *session_data)
{
iks *iq = msg->payload;
switch_core_session_t *session = (switch_core_session_t *)session_data;
struct rayo_component *output_component = NULL;
iks *output = iks_find(iq, "output");
......@@ -131,15 +132,16 @@ static iks *start_call_output_component(struct rayo_actor *client, struct rayo_a
return iks_new_error(iq, STANZA_ERROR_BAD_REQUEST);
}
output_component = create_output_component(call, output, iks_find_attrib(iq, "from"));
output_component = create_output_component(call, RAT_CALL_COMPONENT, output, iks_find_attrib(iq, "from"));
return start_call_output(output_component, session, output, iq);
}
/**
* Start execution of mixer output component
*/
static iks *start_mixer_output_component(struct rayo_actor *client, struct rayo_actor *mixer, iks *iq, void *data)
static iks *start_mixer_output_component(struct rayo_actor *mixer, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct rayo_component *component = NULL;
iks *output = iks_find(iq, "output");
switch_stream_handle_t stream = { 0 };
......@@ -149,7 +151,7 @@ static iks *start_mixer_output_component(struct rayo_actor *client, struct rayo_
return iks_new_error(iq, STANZA_ERROR_BAD_REQUEST);
}
component = create_output_component(mixer, output, iks_find_attrib(iq, "from"));
component = create_output_component(mixer, RAT_MIXER_COMPONENT, output, iks_find_attrib(iq, "from"));
/* build conference command */
SWITCH_STANDARD_STREAM(stream);
......@@ -174,8 +176,9 @@ static iks *start_mixer_output_component(struct rayo_actor *client, struct rayo_
/**
* Stop execution of output component
*/
static iks *stop_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *stop_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s stop", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -190,8 +193,9 @@ static iks *stop_output_component(struct rayo_actor *client, struct rayo_actor *
/**
* Pause execution of output component
*/
static iks *pause_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *pause_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s pause", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -205,8 +209,9 @@ static iks *pause_output_component(struct rayo_actor *client, struct rayo_actor
/**
* Resume execution of output component
*/
static iks *resume_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *resume_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s resume", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -220,8 +225,9 @@ static iks *resume_output_component(struct rayo_actor *client, struct rayo_actor
/**
* Speed up execution of output component
*/
static iks *speed_up_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *speed_up_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s speed:+", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -235,8 +241,9 @@ static iks *speed_up_output_component(struct rayo_actor *client, struct rayo_act
/**
* Slow down execution of output component
*/
static iks *speed_down_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *speed_down_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s speed:-", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -250,8 +257,9 @@ static iks *speed_down_output_component(struct rayo_actor *client, struct rayo_a
/**
* Increase volume of output component
*/
static iks *volume_up_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *volume_up_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s volume:+", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -265,8 +273,9 @@ static iks *volume_up_output_component(struct rayo_actor *client, struct rayo_ac
/**
* Lower volume of output component
*/
static iks *volume_down_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *volume_down_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s volume:-", RAYO_JID(component));
SWITCH_STANDARD_STREAM(stream);
......@@ -280,8 +289,9 @@ static iks *volume_down_output_component(struct rayo_actor *client, struct rayo_
/**
* Seek output component
*/
static iks *seek_output_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *seek_output_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
iks *seek = iks_find(iq, "seek");
if (VALIDATE_RAYO_OUTPUT_SEEK(seek)) {
......
......@@ -144,7 +144,7 @@ static void on_call_record_stop_event(switch_event_t *event)
/**
* Create a record component
*/
static struct rayo_component *record_component_create(struct rayo_actor *actor, const char *client_jid, iks *record)
static struct rayo_component *record_component_create(struct rayo_actor *actor, const char *type, const char *client_jid, iks *record)
{
switch_memory_pool_t *pool;
struct record_component *record_component = NULL;
......@@ -171,7 +171,7 @@ static struct rayo_component *record_component_create(struct rayo_actor *actor,
switch_core_new_memory_pool(&pool);
record_component = switch_core_alloc(pool, sizeof(*record_component));
rayo_component_init(RAYO_COMPONENT(record_component), pool, "record", fs_file_path, actor, client_jid);
rayo_component_init(RAYO_COMPONENT(record_component), pool, type, "record", fs_file_path, actor, client_jid);
record_component->max_duration = iks_find_int_attrib(record, "max-duration");
record_component->initial_timeout = iks_find_int_attrib(record, "initial-timeout");
record_component->final_timeout = iks_find_int_attrib(record, "final-timeout");
......@@ -259,13 +259,14 @@ static int start_call_record(switch_core_session_t *session, struct rayo_compone
/**
* Start execution of call record component
*/
static iks *start_call_record_component(struct rayo_actor *client, struct rayo_actor *call, iks *iq, void *session_data)
static iks *start_call_record_component(struct rayo_actor *call, struct rayo_message *msg, void *session_data)
{
iks *iq = msg->payload;
switch_core_session_t *session = (switch_core_session_t *)session_data;
struct rayo_component *component = NULL;
iks *record = iks_find(iq, "record");
component = record_component_create(call, iks_find_attrib(iq, "from"), record);
component = record_component_create(call, RAT_CALL_COMPONENT, iks_find_attrib(iq, "from"), record);
if (!component) {
return iks_new_error(iq, STANZA_ERROR_BAD_REQUEST);
}
......@@ -284,8 +285,9 @@ static iks *start_call_record_component(struct rayo_actor *client, struct rayo_a
/**
* Stop execution of record component
*/
static iks *stop_call_record_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *stop_call_record_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
if (session) {
RECORD_COMPONENT(component)->stop = 1;
......@@ -298,8 +300,9 @@ static iks *stop_call_record_component(struct rayo_actor *client, struct rayo_ac
/**
* Pause execution of record component
*/
static iks *pause_record_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *pause_record_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct record_component *record = RECORD_COMPONENT(component);
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s pause", record->local_file_path);
......@@ -319,8 +322,9 @@ static iks *pause_record_component(struct rayo_actor *client, struct rayo_actor
/**
* Resume execution of record component
*/
static iks *resume_record_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *resume_record_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct record_component *record = RECORD_COMPONENT(component);
switch_stream_handle_t stream = { 0 };
char *command = switch_mprintf("%s resume", record->local_file_path);
......@@ -381,12 +385,13 @@ static int start_mixer_record(struct rayo_component *component)
/**
* Start execution of mixer record component
*/
static iks *start_mixer_record_component(struct rayo_actor *client, struct rayo_actor *mixer, iks *iq, void *data)
static iks *start_mixer_record_component(struct rayo_actor *mixer, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
struct rayo_component *component = NULL;
iks *record = iks_find(iq, "record");
component = record_component_create(mixer, iks_find_attrib(iq, "from"), record);
component = record_component_create(mixer, RAT_MIXER_COMPONENT, iks_find_attrib(iq, "from"), record);
if (!component) {
return iks_new_error(iq, STANZA_ERROR_BAD_REQUEST);
}
......@@ -412,8 +417,9 @@ static iks *start_mixer_record_component(struct rayo_actor *client, struct rayo_
/**
* Stop execution of record component
*/
static iks *stop_mixer_record_component(struct rayo_actor *client, struct rayo_actor *component, iks *iq, void *data)
static iks *stop_mixer_record_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
char *args;
switch_stream_handle_t stream = { 0 };
SWITCH_STANDARD_STREAM(stream);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论