提交 e83af319 authored 作者: Josh Perry's avatar Josh Perry

Updated message creation

上级 2b1793ed
#include <switch.h> #include <switch.h>
#include <zmq.hpp> #include <zmq.hpp>
#include <string>
#include <exception> #include <exception>
#include <stdexcept> #include <stdexcept>
#include <memory> #include <memory>
...@@ -15,13 +14,6 @@ extern "C" { ...@@ -15,13 +14,6 @@ extern "C" {
SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime); SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
}; };
class ZmqStringMessage : public zmq::message_t {
public:
ZmqStringMessage(const std::string &msg) {
}
};
// Handles publishing events out to clients // Handles publishing events out to clients
class ZmqEventPublisher { class ZmqEventPublisher {
public: public:
...@@ -29,19 +21,28 @@ public: ...@@ -29,19 +21,28 @@ public:
context(1), context(1),
event_publisher(context, ZMQ_PUB) event_publisher(context, ZMQ_PUB)
{ {
event_publisher.bind("tcp://*.5556"); event_publisher.bind("tcp://*:5556");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
} }
void PublishEvent(const switch_event_t *event) { void PublishEvent(const switch_event_t *event) {
// Serialize the event into a JSON string
char* pjson; char* pjson;
switch_event_serialize_json(const_cast<switch_event_t*>(event), &pjson); switch_event_serialize_json(const_cast<switch_event_t*>(event), &pjson);
std::auto_ptr<char> json(pjson);
ZmqStringMessage msg(json.get()); // Use the JSON string as the message body
zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
// Send the message
event_publisher.send(msg); event_publisher.send(msg);
} }
private: private:
static void free_message_data(void *data, void *hint) {
free (data);
}
zmq::context_t context; zmq::context_t context;
zmq::socket_t event_publisher; zmq::socket_t event_publisher;
}; };
...@@ -49,25 +50,39 @@ private: ...@@ -49,25 +50,39 @@ private:
// Handles global inititalization and teardown of the module // Handles global inititalization and teardown of the module
class ZmqModule { class ZmqModule {
public: public:
ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) { ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
_running(false) {
// Subscribe to all switch events of any subclass // Subscribe to all switch events of any subclass
// Store a pointer to ourself in the user data // Store a pointer to ourself in the user data
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node) if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node)
!= SWITCH_STATUS_SUCCESS) { != SWITCH_STATUS_SUCCESS) {
throw std::runtime_error("Couldn't bind to switch events."); throw std::runtime_error("Couldn't bind to switch events.");
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to events\n");
// Create our module interface registration // Create our module interface registration
*module_interface = switch_loadable_module_create_module_interface(pool, modname); *module_interface = switch_loadable_module_create_module_interface(pool, modname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module loaded\n");
} }
void Listen() { void Listen() {
if(_running)
return;
_publisher.reset(new ZmqEventPublisher()); _publisher.reset(new ZmqEventPublisher());
_running = true;
while(_running) {
switch_yield(100000);
}
} }
~ZmqModule() { ~ZmqModule() {
// Unsubscribe from the switch events // Unsubscribe from the switch events
_running = false;
switch_event_unbind(&_node); switch_event_unbind(&_node);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
} }
private: private:
...@@ -79,11 +94,14 @@ private: ...@@ -79,11 +94,14 @@ private:
module->_publisher->PublishEvent(event); module->_publisher->PublishEvent(event);
} catch(std::exception ex) { } catch(std::exception ex) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown error publishing event via 0MQ\n");
} }
} }
switch_event_node_t *_node; switch_event_node_t *_node;
std::auto_ptr<ZmqEventPublisher> _publisher; std::auto_ptr<ZmqEventPublisher> _publisher;
bool _running;
}; };
//*****************************// //*****************************//
...@@ -99,8 +117,8 @@ SWITCH_MODULE_LOAD_FUNCTION(load) { ...@@ -99,8 +117,8 @@ SWITCH_MODULE_LOAD_FUNCTION(load) {
try { try {
module.reset(new ZmqModule(module_interface, pool)); module.reset(new ZmqModule(module_interface, pool));
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} catch(const std::exception &ex) { } catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module: %s\n", ex.what()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module\n");
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
...@@ -110,15 +128,23 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) { ...@@ -110,15 +128,23 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) {
try { try {
// Begin listening for clients // Begin listening for clients
module->Listen(); module->Listen();
} catch(...) { } } catch(std::exception &ex) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error listening for clients: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error listening for clients\n");
}
// Tell the switch to stop calling this runtime loop // Tell the switch to stop calling this runtime loop
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_TERM;
} }
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) { SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
// Free the module object try {
module.reset(); // Free the module object
module.reset();
} catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n");
}
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论