提交 102e5544 authored 作者: William King's avatar William King

Merge pull request #121 in FS/freeswitch from mod_amqp to master

* commit '1a96f23f':
  Adding mod_amqp as an event_handler module
......@@ -94,6 +94,7 @@ endpoints/mod_skinny
#endpoints/mod_skypopen
endpoints/mod_sofia
#endpoints/mod_unicall
#event_handlers/mod_amqp
event_handlers/mod_cdr_csv
#event_handlers/mod_cdr_mongodb
#event_handlers/mod_cdr_pg_csv
......
<configuration name="amqp.conf" description="mod_amqp">
<producers>
<profile name="default">
<connections>
<connection name="primary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5673"/>
<param name="heartbeat" value="0"/>
</connection>
<connection name="secondary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5672"/>
<param name="heartbeat" value="0"/>
</connection>
</connections>
<params>
<param name="exchange" value="TAP.Events"/>
<param name="exchange_type" value="topic"/>
<param name="circuit_breaker_ms" value="10000"/>
<param name="reconnect_interval_ms" value="1000"/>
<param name="send_queue_size" value="5000"/>
<!-- The routing key is made from the format string, using the header values in the event specified in the format_fields.-->
<!-- Fields that are prefixed with a # are treated as literals rather than doing a header lookup -->
<param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname,Event-Name,Event-Subclass,Unique-ID"/>
<!-- <param name="eventFilter" value="SWITCH_EVENT_ALL"/> -->
<param name="event_filter" value="SWITCH_EVENT_CHANNEL_CREATE,SWITCH_EVENT_CHANNEL_DESTROY,SWITCH_EVENT_HEARTBEAT,SWITCH_EVENT_DTMF"/>
</params>
</profile>
</producers>
<commands>
<profile name="default">
<connections>
<connection name="primary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5672"/>
<param name="heartbeat" value="0"/>
</connection>
</connections>
<params>
<param name="eventExchange" value="TAP.Events"/>
<param name="eventExchangetype" value="topic"/>
</params>
</profile>
</commands>
</configuration>
......@@ -1227,6 +1227,10 @@ PKG_CHECK_MODULES([MEMCACHED], [libmemcached >= 0.31],[
AM_CONDITIONAL([HAVE_MEMCACHED],[false])
])
PKG_CHECK_MODULES([AMQP], [librabbitmq >= 0.5.2],[
AM_CONDITIONAL([HAVE_AMQP],[true])],[
AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_AMQP],[false])])
AC_ARG_ENABLE(core-libedit-support,
[AS_HELP_STRING([--disable-core-libedit-support], [Compile without libedit Support])])
......@@ -1572,6 +1576,7 @@ AC_CONFIG_FILES([Makefile
src/mod/endpoints/mod_unicall/Makefile
src/mod/endpoints/mod_rtc/Makefile
src/mod/endpoints/mod_verto/Makefile
src/mod/event_handlers/mod_amqp/Makefile
src/mod/event_handlers/mod_cdr_csv/Makefile
src/mod/event_handlers/mod_cdr_mongodb/Makefile
src/mod/event_handlers/mod_cdr_pg_csv/Makefile
......
include $(top_srcdir)/build/modmake.rulesam
MODNAME=mod_amqp
if HAVE_AMQP
mod_LTLIBRARIES = mod_amqp.la
mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp.c
mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS)
mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS)
else
install: error
all: error
error:
$(error You must install librabbitmq1 and librabbitmq-dev to build this module)
endif
_
_ __ ___ ___ __| | __ _ _ __ ___ __ _ _ __
| '_ ` _ \ / _ \ / _` | / _` | '_ ` _ \ / _` | '_ \
| | | | | | (_) | (_| | | (_| | | | | | | (_| | |_) |
|_| |_| |_|\___/ \__,_|___\__,_|_| |_| |_|\__, | .__/
|_____| |_|_| by Aeriandi
Contents
--------
1. Features
2. How to build and install
3. Configuration
4. Usage
5. Trouleshooting
6. Notes
1. Features
-----------
* Authenticates with an AMQP broker such as RabbitMQ.
* If the broker disconnects, the connection is retried.
* Routing keys can include values from Freeswitch message headers.
* The rate of messages pulished can be limited by filtering event types.
* Messages are sent asynchronously so as not to block the Freeswitch core.
* Pulishing can be temporarily suspended on the event of "back pressure" from the AMQP broker.
2. How to build and install
---------------------------
Requires librabbitmq1 to build. Debian Jessie comes with the correct version.
3. Configuration
----------------
All configuration is done within the amqp.conf.xml file located in the freeswitch/autoload_configs folder, which is usually in /etc/ for linux based machines.
The file is of the format:
<configuration name="amqp.conf" description="mod_amqp">
<settings>
<param name="parameter1" value="value1"/>
<param name="parameter2" value="value2"/>
...etc.
</settings>
</configuration>
Available parameters are as follows:
+------------------------------+-----------------------------------+
| name | default value (units) |
|------------------------------+-----------------------------------|
| amqpHostnames | localhost |
| amqpVirtualHost | / |
| amqpPort | 5672 |
| amqpUsername | guest |
| amqpPassword | guest |
| amqpHeartbeatSeconds | 0 (s) |
| eventExchange | TAP.Events |
| eventExchangetype | topic |
| eventRoutingKeyFormat | %s.%s.%s.%s |
| eventRoutingKeyFormatFields | FreeSWITCH-Hostname,Event-Name, |
| | Event-Subclass,Unique-ID |
| eventFilter | SWITCH_EVENT_CHANNEL_CREATE, |
| | SWITCH_EVENT_CHANNEL_DESTROY, |
| | SWITCH_EVENT_HEARTBEAT, |
| | SWITCH_EVENT_DTMF |
| commandExchange | TAP.Commands |
| commandExchangeType | topic |
| commandBindingKey | TapCommands |
| amqpSendQueueSize | 500 (events) |
| amqpCircuitBreakerTimeout | 10000 (ms) |
| amqpReconnectInterval | 1000 (ms) |
+------------------------------+-----------------------------------+
Set the amqpHostname and amqpPort to point to the AMQP broker, and set valid login credentials using amqpUsername and amqpPassword.
The routing key is made from the eventRoutingKeyFormat format string using the freeswitch event header values specified in the eventRoutingKeyFormatFields. See the manpage printf(1) for more information about format strings. The numer of percent marks in the format string must match the number of comma-separated header names in the format fields string.
mod_amqp has an internal buffer for events so that it can send them asynchronously and also cope with the connection going down for a short amount of time. The size of this buffer is set by amqpSendQueueSize. If this buffer ever becomes full, then mod_amqp will drop event messages for the period of time specified by amqpCircuitBreakerTimeout (in milliseconds).
If the connection to the AMQP broker is severed, mod_amqp will attempt to reconnect regularly according to the amqpReconnectInterval (in milliseconds). It will cycle through the hostnames provided in amqpHostnames.
The eventFilter parameter specifies which events will be sent to the AMQP broker, a full list of available options can be found in src/include/switch_types.h. The special event name SWITCH_EVENT_ALL causes all events to be sent, effectively disabling the filter.
4. Usage
--------
Usually, mod_amqp will be loaded automatically when Freeswitch starts. To establish whether the module has been loaded, you can execute "module_exists mod_amqp" in fs_cli.
If the module is not set to load with Freeswitch, it can be loaded on he fly by executing "load mod_amqp" from fs_cli. You'll see a few lines of status messages as the module loads and tries to connect to the AMQP roker. Conversely, the module can be unloaded using "unload mod_amqp".
To effect new settings having edited the config file, the module should be unloaded then loaded again.
5. Trouleshooting
-----------------
Any errors or warnings will be reported using Freeswitch logging, so check for errors using fs_cli with the loglevel is set to be sufficiently verbose, or with your selected logging module; for example, the syslog logger.
Typically, messages not being received by the AMQP broker is due to network connectivity or failed authentication with the broker.
If mod_amqp experiences back-pressure from the AMQP broker, its internal buffer of events to send fills up. When this buffer is half full, warning messages are logged, and when the queue is completely full the circuit breaker will be triggered, logging an error and dropping events for the predefined amount of time.
6. Notes
--------
The SHA for the revision of librabbitmq-c1 that is included is 1c213703c9fdd747bc71ea4f64943c3b4269f8cf.
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown);
SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load);
SWITCH_MODULE_DEFINITION(mod_amqp, mod_amqp_load, mod_amqp_shutdown, NULL);
SWITCH_STANDARD_API(amqp_reload)
{
return mod_amqp_do_config(SWITCH_TRUE);
}
/* ------------------------------
Startup
------------------------------
*/
SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load)
{
switch_api_interface_t *api_interface;
memset(&globals, 0, sizeof(globals));
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
globals.pool = pool;
switch_core_hash_init(&(globals.producer_hash));
switch_core_hash_init(&(globals.command_hash));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full());
/* Create producer profiles */
if ( mod_amqp_do_config(SWITCH_FALSE) != SWITCH_STATUS_SUCCESS ){
return SWITCH_STATUS_GENERR;
}
SWITCH_ADD_API(api_interface, "amqp", "amqp API", amqp_reload, "syntax");
return SWITCH_STATUS_SUCCESS;
}
/* ------------------------------
Shutdown
------------------------------
*/
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown)
{
switch_hash_index_t *hi;
mod_amqp_producer_profile_t *producer;
mod_amqp_command_profile_t *command;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n");
switch_event_unbind_callback(mod_amqp_producer_event_handler);
for (hi = switch_core_hash_first(globals.producer_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void **)&producer);
mod_amqp_producer_destroy(&producer);
}
for (hi = switch_core_hash_first(globals.command_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void **)&command);
mod_amqp_command_destroy(&command);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n");
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#ifndef MOD_AMQP_H
#define MOD_AMQP_H
#include <switch.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
#include <strings.h>
#define MAX_LOG_MESSAGE_SIZE 1024
#define AMQP_MAX_HOSTS 4
/* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */
#define MAX_ROUTING_KEY_FORMAT_FIELDS 10
#define MAX_AMQP_ROUTING_KEY_LENGTH 255
#define TIME_STATS_TO_AGGREGATE 1024
#define MOD_AMQP_DEBUG_TIMING 0
typedef struct {
char routing_key[MAX_AMQP_ROUTING_KEY_LENGTH];
char *pjson;
} mod_amqp_message_t;
typedef struct mod_amqp_connection_s {
char *name;
char *hostname;
char *virtualhost;
char *username;
char *password;
unsigned int port;
unsigned int heartbeat; /* in seconds */
amqp_connection_state_t state;
struct mod_amqp_connection_s *next;
} mod_amqp_connection_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
/* Array to store the possible event subscriptions */
int event_subscriptions;
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
switch_event_node_t *eventNode;
/* Because only the 'running' thread will be reading or writing to the two connection pointers
* this does not 'yet' need a read/write lock. Before these structures can be destroyed,
* the running thread must be joined first.
*/
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;
/* Rabbit connections are not thread safe so one connection per thread.
Communicate with sender thread using a queue */
switch_thread_t *producer_thread;
switch_queue_t *send_queue;
unsigned int send_queue_size;
int reconnect_interval_ms;
int circuit_breaker_ms;
switch_time_t circuit_breaker_reset_time;
switch_bool_t running;
switch_memory_pool_t *pool;
char *custom_attr;
} mod_amqp_producer_profile_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *binding_key;
/* Array to store the possible event subscriptions */
char *event_filter;
unsigned int number_of_event_filters;
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
/* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;
int reconnect_interval_ms;
/* Listener thread */
switch_thread_t *command_thread;
switch_mutex_t *mutex;
switch_bool_t running;
switch_memory_pool_t *pool;
char *custom_attr;
} mod_amqp_command_profile_t;
struct {
switch_memory_pool_t *pool;
switch_hash_t *producer_hash;
switch_hash_t *command_hash;
} globals;
/* utils */
switch_status_t mod_amqp_do_config(switch_bool_t reload);
int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context);
int mod_amqp_count_chars(const char* string, char ch);
/* connection */
switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool);
void mod_amqp_connection_destroy(mod_amqp_connection_t **conn);
void mod_amqp_connection_close(mod_amqp_connection_t *connection);
switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr);
/* command */
switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **profile);
switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data);
/* producer */
void mod_amqp_producer_event_handler(switch_event_t* evt);
switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],switch_event_t* evt, char* routingKeyEventHeaderNames[]);
switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
#endif /* MOD_AMQP_H */
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
void mod_amqp_connection_close(mod_amqp_connection_t *connection)
{
amqp_connection_state_t old_state = connection->state;
int status = 0;
connection->state = NULL;
if (old_state != NULL) {
mod_amqp_log_if_amqp_error(amqp_channel_close(old_state, 1, AMQP_REPLY_SUCCESS), "Closing channel");
mod_amqp_log_if_amqp_error(amqp_connection_close(old_state, AMQP_REPLY_SUCCESS), "Closing connection");
if ((status = amqp_destroy_connection(old_state))) {
const char *errstr = amqp_error_string2(-status);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error destroying amqp connection: %s\n", errstr);
}
}
}
switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr)
{
int channel_max = 0;
int frame_max = 131072;
amqp_table_t loginProperties;
amqp_table_entry_t loginTableEntries[5];
char hostname[64];
int bHasHostname;
char key_string[256] = {0};
amqp_rpc_reply_t status;
amqp_socket_t *socket = NULL;
int amqp_status = -1;
mod_amqp_connection_t *connection_attempt = NULL;
amqp_connection_state_t newConnection = amqp_new_connection();
amqp_connection_state_t oldConnection = NULL;
if (active && *active) {
oldConnection = (*active)->state;
}
/* Set up meta data for connection */
bHasHostname = gethostname(hostname, sizeof(hostname)) == 0;
loginProperties.num_entries = sizeof(loginTableEntries)/sizeof(*loginTableEntries);
loginProperties.entries = loginTableEntries;
snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr);
loginTableEntries[0].key = amqp_cstring_bytes(key_string);
loginTableEntries[0].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[0].value.value.bytes = amqp_cstring_bytes(bHasHostname ? hostname : "(unknown)");
snprintf(key_string, 256, "x_%s_ProcessDescription", custom_attr);
loginTableEntries[1].key = amqp_cstring_bytes(key_string);
loginTableEntries[1].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[1].value.value.bytes = amqp_cstring_bytes("FreeSwitch");
snprintf(key_string, 256, "x_%s_ProcessType", custom_attr);
loginTableEntries[2].key = amqp_cstring_bytes(key_string);
loginTableEntries[2].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[2].value.value.bytes = amqp_cstring_bytes("TAP");
snprintf(key_string, 256, "x_%s_ProcessBuildVersion", custom_attr);
loginTableEntries[3].key = amqp_cstring_bytes(key_string);
loginTableEntries[3].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[3].value.value.bytes = amqp_cstring_bytes(switch_version_full());
snprintf(key_string, 256, "x_%s_Liquid_ProcessBuildBornOn", custom_attr);
loginTableEntries[4].key = amqp_cstring_bytes(key_string);
loginTableEntries[4].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[4].value.value.bytes = amqp_cstring_bytes(__DATE__ " " __TIME__);
if (!(socket = amqp_tcp_socket_new(newConnection))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create TCP socket\n");
return SWITCH_STATUS_GENERR;
}
connection_attempt = connections;
amqp_status = -1;
while (connection_attempt && amqp_status){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] trying to connect to AMQP broker %s:%d\n",
profile_name, connection_attempt->hostname, connection_attempt->port);
if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n",
connection_attempt->hostname, connection_attempt->port, amqp_status, amqp_error_string2(amqp_status));
connection_attempt = connection_attempt->next;
}
}
if (!connection_attempt) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", profile_name);
return SWITCH_STATUS_GENERR;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] opened socket connection to AMQP broker %s:%d\n",
profile_name, connection_attempt->hostname, connection_attempt->port);
*active = connection_attempt;
/* We have a connection, now log in */
status = amqp_login_with_properties(newConnection,
connection_attempt->virtualhost,
channel_max,
frame_max,
connection_attempt->heartbeat,
&loginProperties,
AMQP_SASL_METHOD_PLAIN,
connection_attempt->username,
connection_attempt->password);
if (mod_amqp_log_if_amqp_error(status, "Logging in")) {
return SWITCH_STATUS_GENERR;
}
// Open a channel (1). This is fairly standard
amqp_channel_open(newConnection, 1);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(newConnection), "Opening channel")) {
return SWITCH_STATUS_GENERR;
}
(*active)->state = newConnection;
if (oldConnection) {
amqp_destroy_connection(oldConnection);
}
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool)
{
mod_amqp_connection_t *new_con = switch_core_alloc(pool, sizeof(mod_amqp_connection_t));
switch_xml_t param;
char *name = (char *) switch_xml_attr_soft(cfg, "name");
char *hostname = NULL, *virtualhost = NULL, *username = NULL, *password = NULL;
unsigned int port = 0, heartbeat = 0;
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", switch_xml_toxml(cfg, 1));
return SWITCH_STATUS_GENERR;
}
new_con->name = switch_core_strdup(pool, name);
new_con->state = NULL;
for (param = switch_xml_child(cfg, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!var) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param missing 'name' attribute\n", name);
continue;
}
if (!val) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var);
continue;
}
if (!strncmp(var, "hostname", 8)) {
hostname = switch_core_strdup(pool, val);
} else if (!strncmp(var, "virtualhost", 11)) {
virtualhost = switch_core_strdup(pool, val);
} else if (!strncmp(var, "username", 8)) {
username = switch_core_strdup(pool, val);
} else if (!strncmp(var, "password", 8)) {
password = switch_core_strdup(pool, val);
} else if (!strncmp(var, "port", 4)) {
int interval = atoi(val);
if (interval && interval > 0) {
port = interval;
}
} else if (!strncmp(var, "heartbeat", 4)) {
int interval = atoi(val);
if (interval && interval > 0) {
heartbeat = interval;
}
}
}
new_con->hostname = hostname ? hostname : "localhost";
new_con->virtualhost = virtualhost ? virtualhost : "/";
new_con->username = username ? username : "guest";
new_con->password = password ? password : "guest";
new_con->port = port ? port : 5672;
new_con->heartbeat = heartbeat ? heartbeat : 0;
*conn = new_con;
return SWITCH_STATUS_SUCCESS;
}
void mod_amqp_connection_destroy(mod_amqp_connection_t **conn)
{
if (conn && *conn) {
*conn = NULL;
}
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return 0;
case AMQP_RESPONSE_NONE:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: server connection error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: server channel error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: unknown reply_type: %d \n", context, x.reply_type);
break;
}
return -1;
}
int mod_amqp_count_chars(const char* string, char ch)
{
int c = 0;
while (*string) c += *(string++) == ch;
return c;
}
switch_status_t mod_amqp_do_config(switch_bool_t reload)
{
switch_xml_t cfg = NULL, xml = NULL, profiles = NULL, profile = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, reload ? "Reloading Config\n" : "Loading Config\n");
if (!(xml = switch_xml_open_cfg("amqp.conf", &cfg, NULL))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "open of amqp.conf.xml failed\n");
return SWITCH_STATUS_FALSE;
}
if ((profiles = switch_xml_child(cfg, "producers"))) {
if ((profile = switch_xml_child(profiles, "profile"))) {
for (; profile; profile = profile->next) {
char *name = (char *) switch_xml_attr_soft(profile, "name");
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n");
continue;
}
if ( mod_amqp_producer_create(name, profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name);
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" );
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate producers section for mod_amqp\n" );
}
if ((profiles = switch_xml_child(cfg, "commands"))) {
if ((profile = switch_xml_child(profiles, "profile"))) {
for (; profile; profile = profile->next) {
char *name = (char *) switch_xml_attr_soft(profile, "name");
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n");
continue;
}
name = switch_core_strdup(globals.pool, name);
if ( mod_amqp_command_create(name, profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name);
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" );
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate commands section for mod_amqp\n" );
}
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论