提交 a2921e26 authored 作者: Anthony Minessale's avatar Anthony Minessale

update verto with some fixes and enhancements with reconnection

上级 52435978
...@@ -787,7 +787,7 @@ AC_PROG_GCC_TRADITIONAL ...@@ -787,7 +787,7 @@ AC_PROG_GCC_TRADITIONAL
AC_FUNC_MALLOC AC_FUNC_MALLOC
AC_TYPE_SIGNAL AC_TYPE_SIGNAL
AC_FUNC_STRFTIME AC_FUNC_STRFTIME
AC_CHECK_FUNCS([gethostname vasprintf mmap mlock mlockall usleep getifaddrs timerfd_create getdtablesize posix_openpt]) AC_CHECK_FUNCS([gethostname vasprintf mmap mlock mlockall usleep getifaddrs timerfd_create getdtablesize posix_openpt poll])
AC_CHECK_FUNCS([sched_setscheduler setpriority setrlimit setgroups initgroups]) AC_CHECK_FUNCS([sched_setscheduler setpriority setrlimit setgroups initgroups])
AC_CHECK_FUNCS([wcsncmp setgroups asprintf setenv pselect gettimeofday localtime_r gmtime_r strcasecmp stricmp _stricmp]) AC_CHECK_FUNCS([wcsncmp setgroups asprintf setenv pselect gettimeofday localtime_r gmtime_r strcasecmp stricmp _stricmp])
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<html> <html>
<head> <head>
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="css/jquery.mobile-1.3.2.min.css" /> <link rel="stylesheet" href="//ajax.googleapis.com/ajax/libs/jquerymobile/1.4.3/jquery.mobile.min.css"/>
<link rel="stylesheet" type="text/css" href="css/jsontable.css" /> <link rel="stylesheet" type="text/css" href="css/jsontable.css" />
<link rel="shortcut icon" href="favicon.ico" /> <link rel="shortcut icon" href="favicon.ico" />
<meta charset="utf-8" /> <meta charset="utf-8" />
...@@ -143,7 +143,7 @@ ...@@ -143,7 +143,7 @@
<div id="errordisplay" style="font-weight:bold;font-size:18px;color:#ae0000"></div> <div id="errordisplay" style="font-weight:bold;font-size:18px;color:#ae0000"></div>
</div> </div>
<div id="online" align="center"> <div id="online" align="center" style="width:600px">
<div id="text"></div> <div id="text"></div>
<div data-role="fieldcontain"> <div data-role="fieldcontain">
<input type="text" id="ext"/><br> <input type="text" id="ext"/><br>
...@@ -267,11 +267,11 @@ ...@@ -267,11 +267,11 @@
</div> </div>
<script type="text/javascript" src="js/jquery-2.0.3.min.js"></script> <script type="text/javascript" src="//code.jquery.com/jquery-2.1.1.min.js"></script>
<script type="text/javascript" src="js/jquery.mobile-1.3.2.min.js"></script> <script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquerymobile/1.4.3/jquery.mobile.min.js"></script>
<script type="text/javascript" src="js/jquery.json-2.4.min.js"></script> <script type="text/javascript" src="js/jquery.json-2.4.min.js"></script>
<script type="text/javascript" src="js/jquery.cookie.js"></script> <script type="text/javascript" src="js/jquery.cookie.js"></script>
<script type="text/javascript" src="js/jquery.dataTables.js"></script> <script type="text/javascript" src="//cdn.datatables.net/1.10.1/js/jquery.dataTables.min.js"></script>
<script type="text/javascript" src="js/verto-min.js"></script> <script type="text/javascript" src="js/verto-min.js"></script>
<script type="text/javascript" src="verto.js"></script> <script type="text/javascript" src="verto.js"></script>
......
This diff was suppressed by a .gitattributes entry.
...@@ -4,6 +4,9 @@ var confMan = null; ...@@ -4,6 +4,9 @@ var confMan = null;
var $display = $("#display"); var $display = $("#display");
var verto; var verto;
var ringing = false; var ringing = false;
var autocall = false;
$( ".selector" ).pagecontainer({ "theme": "a" });
function display(msg) { function display(msg) {
$("#calltitle").html(msg); $("#calltitle").html(msg);
...@@ -19,13 +22,11 @@ function clearConfMan() { ...@@ -19,13 +22,11 @@ function clearConfMan() {
} }
function goto_dialog(where) { function goto_dialog(where) {
$.mobile.changePage("#dialog-" + where, { $( ":mobile-pagecontainer" ).pagecontainer( "change", "#dialog-" + where, { role: "dialog" } );
role: "dialog"
});
} }
function goto_page(where) { function goto_page(where, force) {
$.mobile.changePage("#page-" + where); $( ":mobile-pagecontainer" ).pagecontainer( "change", "#page-" + where);
} }
var first_login = false; var first_login = false;
...@@ -58,7 +59,7 @@ var callbacks = { ...@@ -58,7 +59,7 @@ var callbacks = {
switch (msg) { switch (msg) {
case $.verto.enum.message.pvtEvent: case $.verto.enum.message.pvtEvent:
console.error("pvtEvent", data.pvtData.action); //console.error("pvtEvent", data.pvtData.action);
if (data.pvtData) { if (data.pvtData) {
switch (data.pvtData.action) { switch (data.pvtData.action) {
...@@ -139,6 +140,10 @@ var callbacks = { ...@@ -139,6 +140,10 @@ var callbacks = {
break; break;
case $.verto.enum.state.trying:
display("Calling: " + d.cidString());
goto_page("incall");
break;
case $.verto.enum.state.early: case $.verto.enum.state.early:
case $.verto.enum.state.active: case $.verto.enum.state.active:
display("Talking to: " + d.cidString()); display("Talking to: " + d.cidString());
...@@ -168,14 +173,22 @@ var callbacks = { ...@@ -168,14 +173,22 @@ var callbacks = {
if (success) { if (success) {
online(true); online(true);
/*
verto.subscribe("presence", { verto.subscribe("presence", {
handler: function(v, e) { handler: function(v, e) {
console.error("PRESENCE:", e); console.error("PRESENCE:", e);
} }
}); });
*/
if (!window.location.hash) { if (!window.location.hash) {
goto_page("main"); goto_page("main");
} }
if (autocall) {
autocall = false;
docall();
}
} else { } else {
goto_page("login"); goto_page("login");
goto_dialog("login-error"); goto_dialog("login-error");
...@@ -193,7 +206,7 @@ var callbacks = { ...@@ -193,7 +206,7 @@ var callbacks = {
}, },
onEvent: function(v, e) { onEvent: function(v, e) {
console.debug("w00t", e); console.debug("GOT EVENT", e);
}, },
}; };
...@@ -246,7 +259,7 @@ $("#webcam").click(function() { ...@@ -246,7 +259,7 @@ $("#webcam").click(function() {
check_vid(); check_vid();
}); });
$("#callbtn").click(function() { function docall() {
$('#ext').trigger('change'); $('#ext').trigger('change');
if (cur_call) { if (cur_call) {
...@@ -262,6 +275,10 @@ $("#callbtn").click(function() { ...@@ -262,6 +275,10 @@ $("#callbtn").click(function() {
useVideo: check_vid(), useVideo: check_vid(),
useStereo: $("#use_stereo").is(':checked') useStereo: $("#use_stereo").is(':checked')
}); });
}
$("#callbtn").click(function() {
docall();
}); });
function pop(id, cname, dft) { function pop(id, cname, dft) {
...@@ -366,58 +383,43 @@ function init() { ...@@ -366,58 +383,43 @@ function init() {
} }
$(document).ready(function() { $(document).ready(function() {
var autocall = false;
var hash = window.location.hash.substring(1); var hash = window.location.hash.substring(1);
if (hash && hash.indexOf("page-") == -1) { if (hash && hash.indexOf("page-") == -1) {
window.location.hash = ""; window.location.hash = "";
$("#ext").val(hash);
autocall = true; autocall = true;
} }
init(); init();
$("#page-incall").on("pagebeforechange", function(event) {}); });
if (autocall) {
$("#ext").val(hash);
$("#callbtn").trigger("click");
}
var lastTo = 0;
}); $(document).bind("pagecontainerchange", function(e, data) {
$(document).bind("pagebeforechange", function(e, data) { if (lastTo) {
if (typeof(data.toPage) !== "string") { clearTimeout(lastTo);
return;
} }
switch (window.location.hash) { switch (window.location.hash) {
case "#page-incall": case "#page-incall":
lastTo = setTimeout(function() {
console.error(e, data);
setTimeout(function() {
if (!cur_call) { if (!cur_call) {
goto_page("main"); goto_page("main");
} }
}, }, 1000);
10000);
break;
case "#page-main":
console.error(e, data);
setTimeout(function() {
if (cur_call && !ringing) {
goto_page("incall");
}
},
2000);
break; break;
case "#page-main":
break;
case "#page-login": case "#page-login":
setTimeout(function() { lastTo = setTimeout(function() {
if (online_visible) { if (online_visible) {
goto_page("main"); goto_page("main");
} }
...@@ -426,3 +428,4 @@ $(document).bind("pagebeforechange", function(e, data) { ...@@ -426,3 +428,4 @@ $(document).bind("pagebeforechange", function(e, data) {
break; break;
} }
}); });
...@@ -74,6 +74,8 @@ ...@@ -74,6 +74,8 @@
getSocket : function(onmessage_cb) { return self._getSocket(onmessage_cb); } getSocket : function(onmessage_cb) { return self._getSocket(onmessage_cb); }
}, options); }, options);
self.ws_cnt = 0;
// Declare an instance version of the onmessage callback to wrap 'this'. // Declare an instance version of the onmessage callback to wrap 'this'.
this.wsOnMessage = function(event) { self._wsOnMessage(event); }; this.wsOnMessage = function(event) { self._wsOnMessage(event); };
}; };
...@@ -261,42 +263,53 @@ ...@@ -261,42 +263,53 @@
$.JsonRpcClient.prototype.connectSocket = function(onmessage_cb) { $.JsonRpcClient.prototype.connectSocket = function(onmessage_cb) {
var self = this; var self = this;
if (self.to) {
clearTimeout(self.to);
}
if (!self.socketReady()) { if (!self.socketReady()) {
self.authing = false; self.authing = false;
if (self._ws_socket) {
delete self._ws_socket;
}
// No socket, or dying socket, let's get a new one. // No socket, or dying socket, let's get a new one.
this._ws_socket = new WebSocket(this.options.socketUrl); self._ws_socket = new WebSocket(self.options.socketUrl);
if (this._ws_socket) { if (self._ws_socket) {
// Set up onmessage handler. // Set up onmessage handler.
this._ws_socket.onmessage = onmessage_cb; self._ws_socket.onmessage = onmessage_cb;
this._ws_socket.onclose = function (w) { self._ws_socket.onclose = function (w) {
if (!self.ws_sleep) { if (!self.ws_sleep) {
self.ws_sleep = 2; self.ws_sleep = 500;
} }
self.ws_cnt = 0;
if (self.options.onWSClose) { if (self.options.onWSClose) {
self.options.onWSClose(self); self.options.onWSClose(self);
} }
console.error("Websocket Lost sleep: " + self.ws_sleep + "sec"); console.error("Websocket Lost " + self.ws_cnt + " sleep: " + self.ws_sleep + "msec");
setTimeout(function() { self.to = setTimeout(function() {
console.log("Attempting Reconnection...."); console.log("Attempting Reconnection....");
self.connectSocket(onmessage_cb); self.connectSocket(onmessage_cb);
}, self.ws_sleep * 1000); }, self.ws_sleep);
self.ws_cnt++;
if (++self.ws_cnt >= 150) { if (self.ws_sleep < 3000 && (self.ws_cnt % 100) == 0) {
self.ws_sleep = 30; self.ws_sleep += 500;
} }
} }
// Set up sending of message for when the socket is open. // Set up sending of message for when the socket is open.
this._ws_socket.onopen = function() { self._ws_socket.onopen = function() {
this.ws_sleep = 2; if (self.to) {
this.ws_cnt = 0; clearTimeout(self.to);
}
self.ws_sleep = 500;
self.ws_cnt = 0;
if (self.options.onWSConnect) { if (self.options.onWSConnect) {
self.options.onWSConnect(self); self.options.onWSConnect(self);
} }
...@@ -310,7 +323,7 @@ ...@@ -310,7 +323,7 @@
} }
} }
return this._ws_socket ? true : false; return self._ws_socket ? true : false;
} }
$.JsonRpcClient.prototype._getSocket = function(onmessage_cb) { $.JsonRpcClient.prototype._getSocket = function(onmessage_cb) {
......
...@@ -165,7 +165,7 @@ ...@@ -165,7 +165,7 @@
var verto = this; var verto = this;
var i; var i;
console.log("Response: " + method, success, e); //console.log("Response: " + method, success, e);
switch (method) { switch (method) {
case "verto.subscribe": case "verto.subscribe":
...@@ -864,7 +864,7 @@ ...@@ -864,7 +864,7 @@
var eventHandler = function(v, e, la) { var eventHandler = function(v, e, la) {
var packet = e.data; var packet = e.data;
console.error("READ:", packet); //console.error("READ:", packet);
if (packet.name != la.name) { if (packet.name != la.name) {
return; return;
...@@ -1079,7 +1079,7 @@ ...@@ -1079,7 +1079,7 @@
if (!args.data) { if (!args.data) {
return; return;
} }
console.debug(args, index); //console.debug(args, index);
dt.fnUpdate(args.data, index); dt.fnUpdate(args.data, index);
dt.fnAdjustColumnSizing(); dt.fnAdjustColumnSizing();
break; break;
...@@ -1539,7 +1539,7 @@ ...@@ -1539,7 +1539,7 @@
return false; return false;
} }
console.error("Dialog " + dialog.callID + ": state change from " + dialog.state.name + " to " + state.name); console.info("Dialog " + dialog.callID + ": state change from " + dialog.state.name + " to " + state.name);
dialog.lastState = dialog.state; dialog.lastState = dialog.state;
dialog.state = state; dialog.state = state;
...@@ -1580,7 +1580,7 @@ ...@@ -1580,7 +1580,7 @@
$.verto.dialog.prototype.processReply = function(method, success, e) { $.verto.dialog.prototype.processReply = function(method, success, e) {
var dialog = this; var dialog = this;
console.log("Response: " + method + " State:" + dialog.state.name, success, e); //console.log("Response: " + method + " State:" + dialog.state.name, success, e);
switch (method) { switch (method) {
......
...@@ -2024,8 +2024,10 @@ typedef enum { ...@@ -2024,8 +2024,10 @@ typedef enum {
#ifdef WIN32 #ifdef WIN32
typedef SOCKET switch_os_socket_t; typedef SOCKET switch_os_socket_t;
#define SWITCH_SOCK_INVALID INVALID_SOCKET
#else #else
typedef int switch_os_socket_t; typedef int switch_os_socket_t;
#define SWITCH_SOCK_INVALID -1
#endif #endif
typedef struct apr_pool_t switch_memory_pool_t; typedef struct apr_pool_t switch_memory_pool_t;
...@@ -2385,7 +2387,16 @@ typedef enum { ...@@ -2385,7 +2387,16 @@ typedef enum {
ICE_CONTROLLED = (1 << 2) ICE_CONTROLLED = (1 << 2)
} switch_core_media_ice_type_t; } switch_core_media_ice_type_t;
typedef enum {
SWITCH_POLL_READ = (1 << 0),
SWITCH_POLL_WRITE = (1 << 1),
SWITCH_POLL_ERROR = (1 << 2),
SWITCH_POLL_HUP = (1 << 3),
SWITCH_POLL_RDNORM = (1 << 4),
SWITCH_POLL_RDBAND = (1 << 5),
SWITCH_POLL_PRI = (1 << 6),
SWITCH_POLL_INVALID = (1 << 7)
} switch_poll_t;
SWITCH_END_EXTERN_C SWITCH_END_EXTERN_C
#endif #endif
......
...@@ -1066,7 +1066,7 @@ SWITCH_DECLARE(unsigned long) switch_atoul(const char *nptr); ...@@ -1066,7 +1066,7 @@ SWITCH_DECLARE(unsigned long) switch_atoul(const char *nptr);
* \return Pointer to message buffer, returning error message or "Unknown error xxx" if none found * \return Pointer to message buffer, returning error message or "Unknown error xxx" if none found
*/ */
SWITCH_DECLARE(char *) switch_strerror_r(int errnum, char *buf, switch_size_t buflen); SWITCH_DECLARE(char *) switch_strerror_r(int errnum, char *buf, switch_size_t buflen);
SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switch_poll_t flags);
SWITCH_END_EXTERN_C SWITCH_END_EXTERN_C
#endif #endif
/* For Emacs: /* For Emacs:
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#include <unistd.h> #include <unistd.h>
#include "mcast.h" #include "mcast.h"
#include <poll.h> #include <poll.h>
#include <switch_utils.h>
int mcast_socket_create(const char *host, int16_t port, mcast_handle_t *handle, mcast_flag_t flags) int mcast_socket_create(const char *host, int16_t port, mcast_handle_t *handle, mcast_flag_t flags)
{ {
...@@ -151,7 +152,6 @@ ssize_t mcast_socket_send(mcast_handle_t *handle, void *data, size_t datalen) ...@@ -151,7 +152,6 @@ ssize_t mcast_socket_send(mcast_handle_t *handle, void *data, size_t datalen)
ssize_t mcast_socket_recv(mcast_handle_t *handle, void *data, size_t datalen, int ms) ssize_t mcast_socket_recv(mcast_handle_t *handle, void *data, size_t datalen, int ms)
{ {
socklen_t addrlen = sizeof(handle->recv_addr); socklen_t addrlen = sizeof(handle->recv_addr);
int r;
if (data == NULL || datalen == 0) { if (data == NULL || datalen == 0) {
data = handle->buffer; data = handle->buffer;
...@@ -159,16 +159,9 @@ ssize_t mcast_socket_recv(mcast_handle_t *handle, void *data, size_t datalen, in ...@@ -159,16 +159,9 @@ ssize_t mcast_socket_recv(mcast_handle_t *handle, void *data, size_t datalen, in
} }
if (ms > 0) { if (ms > 0) {
struct pollfd pfds[1]; int pflags = switch_wait_sock(handle->sock, ms, SWITCH_POLL_READ | SWITCH_POLL_ERROR | SWITCH_POLL_HUP);
pfds[0].fd = handle->sock;
pfds[0].events = POLLIN|POLLERR;
if ((r = poll(pfds, 1, ms)) <= 0) {
return r;
}
if (pfds[0].revents & POLLERR) { if ((pflags & SWITCH_POLL_ERROR) || (pflags & SWITCH_POLL_HUP)) {
return -1; return -1;
} }
} }
......
...@@ -554,6 +554,29 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t ...@@ -554,6 +554,29 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t
return r; return r;
} }
static switch_status_t jsock_queue_event(jsock_t *jsock, cJSON **json)
{
switch_status_t status = SWITCH_STATUS_FALSE;
if (switch_queue_trypush(jsock->event_queue, *json) == SWITCH_STATUS_SUCCESS) {
status = SWITCH_STATUS_SUCCESS;
if (jsock->lost_events) {
int le = jsock->lost_events;
jsock->lost_events = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d json events!\n", le);
}
} else {
if (++jsock->lost_events > MAX_MISSED) {
jsock->drop++;
}
cJSON_Delete(*json);
}
*json = NULL;
return status;
}
static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *event) static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *event)
{ {
...@@ -569,7 +592,8 @@ static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *ev ...@@ -569,7 +592,8 @@ static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *ev
params = cJSON_Duplicate(event, 1); params = cJSON_Duplicate(event, 1);
cJSON_AddItemToObject(params, "eventSerno", cJSON_CreateNumber(np->serno++)); cJSON_AddItemToObject(params, "eventSerno", cJSON_CreateNumber(np->serno++));
msg = jrpc_new_req("verto.event", NULL, &params); msg = jrpc_new_req("verto.event", NULL, &params);
ws_write_json(np->jsock, &msg, SWITCH_TRUE); //ws_write_json(np->jsock, &msg, SWITCH_TRUE);
jsock_queue_event(np->jsock, &msg);
} }
} }
} }
...@@ -601,7 +625,8 @@ static void jsock_send_event(cJSON *event) ...@@ -601,7 +625,8 @@ static void jsock_send_event(cJSON *event)
cJSON *msg = NULL, *params; cJSON *msg = NULL, *params;
params = cJSON_Duplicate(event, 1); params = cJSON_Duplicate(event, 1);
msg = jrpc_new_req("verto.event", NULL, &params); msg = jrpc_new_req("verto.event", NULL, &params);
ws_write_json(use_jsock, &msg, SWITCH_TRUE); //ws_write_json(use_jsock, &msg, SWITCH_TRUE);
jsock_queue_event(use_jsock, &msg);
switch_thread_rwlock_unlock(use_jsock->rwlock); switch_thread_rwlock_unlock(use_jsock->rwlock);
use_jsock = NULL; use_jsock = NULL;
return; return;
...@@ -747,6 +772,7 @@ static void check_permissions(jsock_t *jsock, switch_xml_t x_user, cJSON *params ...@@ -747,6 +772,7 @@ static void check_permissions(jsock_t *jsock, switch_xml_t x_user, cJSON *params
} }
} }
set_perm(allowed_methods, &jsock->allowed_methods); set_perm(allowed_methods, &jsock->allowed_methods);
set_perm(allowed_jsapi, &jsock->allowed_jsapi); set_perm(allowed_jsapi, &jsock->allowed_jsapi);
set_perm(allowed_fsapi, &jsock->allowed_fsapi); set_perm(allowed_fsapi, &jsock->allowed_fsapi);
...@@ -1186,6 +1212,19 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize ...@@ -1186,6 +1212,19 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize
return status; return status;
} }
static void jsock_check_event_queue(jsock_t *jsock)
{
void *pop;
int this_pass = switch_queue_size(jsock->event_queue);
switch_mutex_lock(jsock->write_mutex);
while(this_pass-- > 0 && switch_queue_trypop(jsock->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
cJSON *json = (cJSON *) pop;
ws_write_json(jsock, &json, SWITCH_TRUE);
}
switch_mutex_unlock(jsock->write_mutex);
}
static void client_run(jsock_t *jsock) static void client_run(jsock_t *jsock)
{ {
...@@ -1199,43 +1238,31 @@ static void client_run(jsock_t *jsock) ...@@ -1199,43 +1238,31 @@ static void client_run(jsock_t *jsock)
} }
while(jsock->profile->running) { while(jsock->profile->running) {
struct pollfd pfds[1]; int pflags = switch_wait_sock(jsock->client_socket, 50, SWITCH_POLL_READ | SWITCH_POLL_ERROR | SWITCH_POLL_HUP);
int res;
memset(&pfds[0], 0, sizeof(pfds[0]));
pfds[0].fd = jsock->client_socket; if (jsock->drop) {
pfds[0].events = POLLIN|POLLERR|POLLHUP|POLLRDNORM|POLLRDBAND|POLLPRI; die("%s Dropping Connection\n", jsock->name);
}
if (pflags < 0) {
if ((res = poll(pfds, 1, -1)) < 0) {
if (errno != EINTR) { if (errno != EINTR) {
die("%s POLL FAILED\n", jsock->name); die("%s POLL FAILED\n", jsock->name);
} }
} }
if (res < 0) { if (pflags & SWITCH_POLL_ERROR) {
die("%s POLL ERROR\n", jsock->name); die("%s POLL ERROR\n", jsock->name);
} }
if (jsock->drop) { if (pflags & SWITCH_POLL_HUP) {
die("%s Dropping Connection\n", jsock->name);
}
if (pfds[0].revents & POLLERR) {
die("%s POLL ERROR\n", jsock->name);
}
if (pfds[0].revents & POLLHUP) {
die("%s POLL HANGUP DETECTED\n", jsock->name); die("%s POLL HANGUP DETECTED\n", jsock->name);
} }
if (pfds[0].revents & POLLNVAL) { if (pflags & SWITCH_POLL_INVALID) {
die("%s POLL INVALID SOCKET\n", jsock->name); die("%s POLL INVALID SOCKET\n", jsock->name);
} }
if (pfds[0].revents & POLLIN) { if (pflags & SWITCH_POLL_READ) {
switch_ssize_t bytes; switch_ssize_t bytes;
ws_opcode_t oc; ws_opcode_t oc;
uint8_t *data; uint8_t *data;
...@@ -1257,6 +1284,8 @@ static void client_run(jsock_t *jsock) ...@@ -1257,6 +1284,8 @@ static void client_run(jsock_t *jsock)
switch_set_flag(jsock, JPFLAG_CHECK_ATTACH); switch_set_flag(jsock, JPFLAG_CHECK_ATTACH);
} }
} }
} else {
jsock_check_event_queue(jsock);
} }
} }
...@@ -1268,6 +1297,18 @@ static void client_run(jsock_t *jsock) ...@@ -1268,6 +1297,18 @@ static void client_run(jsock_t *jsock)
return; return;
} }
static void jsock_flush(jsock_t *jsock)
{
void *pop;
switch_mutex_lock(jsock->write_mutex);
while(switch_queue_trypop(jsock->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
cJSON *json = (cJSON *) pop;
cJSON_Delete(json);
}
switch_mutex_unlock(jsock->write_mutex);
}
static void *SWITCH_THREAD_FUNC client_thread(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC client_thread(switch_thread_t *thread, void *obj)
{ {
jsock_t *jsock = (jsock_t *) obj; jsock_t *jsock = (jsock_t *) obj;
...@@ -1302,6 +1343,7 @@ static void *SWITCH_THREAD_FUNC client_thread(switch_thread_t *thread, void *obj ...@@ -1302,6 +1343,7 @@ static void *SWITCH_THREAD_FUNC client_thread(switch_thread_t *thread, void *obj
switch_event_destroy(&jsock->allowed_jsapi); switch_event_destroy(&jsock->allowed_jsapi);
switch_event_destroy(&jsock->allowed_event_channels); switch_event_destroy(&jsock->allowed_event_channels);
jsock_flush(jsock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Ending client thread.\n", jsock->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Ending client thread.\n", jsock->name);
switch_thread_rwlock_wrlock(jsock->rwlock); switch_thread_rwlock_wrlock(jsock->rwlock);
...@@ -1772,7 +1814,8 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ ...@@ -1772,7 +1814,8 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
jmsg = jrpc_new_req("verto.display", tech_pvt->call_id, &params); jmsg = jrpc_new_req("verto.display", tech_pvt->call_id, &params);
cJSON_AddItemToObject(params, "display_name", cJSON_CreateString(name)); cJSON_AddItemToObject(params, "display_name", cJSON_CreateString(name));
cJSON_AddItemToObject(params, "display_number", cJSON_CreateString(number)); cJSON_AddItemToObject(params, "display_number", cJSON_CreateString(number));
ws_write_json(jsock, &jmsg, SWITCH_TRUE); //ws_write_json(jsock, &jmsg, SWITCH_TRUE);
jsock_queue_event(jsock, &jmsg);
} }
switch_thread_rwlock_unlock(jsock->rwlock); switch_thread_rwlock_unlock(jsock->rwlock);
...@@ -2709,7 +2752,8 @@ static switch_bool_t event_channel_check_auth(jsock_t *jsock, const char *event_ ...@@ -2709,7 +2752,8 @@ static switch_bool_t event_channel_check_auth(jsock_t *jsock, const char *event_
} }
} }
if (!(switch_event_get_header(jsock->allowed_event_channels, event_channel) || if ((!globals.enable_fs_events && (!strcasecmp(event_channel, "FSevent") || (main_event_channel && !strcasecmp(main_event_channel, "FSevent")))) ||
!(switch_event_get_header(jsock->allowed_event_channels, event_channel) ||
(main_event_channel && switch_event_get_header(jsock->allowed_event_channels, main_event_channel)))) { (main_event_channel && switch_event_get_header(jsock->allowed_event_channels, main_event_channel)))) {
ok = SWITCH_FALSE; ok = SWITCH_FALSE;
} }
...@@ -3062,6 +3106,8 @@ static int start_jsock(verto_profile_t *profile, int sock) ...@@ -3062,6 +3106,8 @@ static int start_jsock(verto_profile_t *profile, int sock)
td->pool = pool; td->pool = pool;
switch_mutex_init(&jsock->write_mutex, SWITCH_MUTEX_NESTED, jsock->pool); switch_mutex_init(&jsock->write_mutex, SWITCH_MUTEX_NESTED, jsock->pool);
switch_mutex_init(&jsock->filter_mutex, SWITCH_MUTEX_NESTED, jsock->pool);
switch_queue_create(&jsock->event_queue, MAX_QUEUE_LEN, jsock->pool);
switch_thread_rwlock_create(&jsock->rwlock, jsock->pool); switch_thread_rwlock_create(&jsock->rwlock, jsock->pool);
switch_thread_pool_launch_thread(&td); switch_thread_pool_launch_thread(&td);
...@@ -3561,6 +3607,8 @@ static switch_status_t parse_config(const char *cf) ...@@ -3561,6 +3607,8 @@ static switch_status_t parse_config(const char *cf)
} }
} else if (!strcasecmp(var, "enable-presence") && val) { } else if (!strcasecmp(var, "enable-presence") && val) {
globals.enable_presence = switch_true(val); globals.enable_presence = switch_true(val);
} else if (!strcasecmp(var, "enable-fs-events") && val) {
globals.enable_fs_events = switch_true(val);
} else if (!strcasecmp(var, "detach-timeout-sec") && val) { } else if (!strcasecmp(var, "detach-timeout-sec") && val) {
int tmp = atoi(val); int tmp = atoi(val);
if (tmp > 0) { if (tmp > 0) {
...@@ -3917,9 +3965,10 @@ static int verto_send_chat(const char *uid, const char *call_id, cJSON *msg) ...@@ -3917,9 +3965,10 @@ static int verto_send_chat(const char *uid, const char *call_id, cJSON *msg)
jsock_t *jsock; jsock_t *jsock;
if ((jsock = get_jsock(tech_pvt->jsock_uuid))) { if ((jsock = get_jsock(tech_pvt->jsock_uuid))) {
if (ws_write_json(jsock, &msg, SWITCH_FALSE) <= 0) { jsock_queue_event(jsock, &msg);
switch_channel_hangup(switch_core_session_get_channel(session), SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); //if (ws_write_json(jsock, &msg, SWITCH_FALSE) <= 0) {
} // switch_channel_hangup(switch_core_session_get_channel(session), SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
//}
switch_thread_rwlock_unlock(jsock->rwlock); switch_thread_rwlock_unlock(jsock->rwlock);
done = 1; done = 1;
} }
...@@ -3939,7 +3988,8 @@ static int verto_send_chat(const char *uid, const char *call_id, cJSON *msg) ...@@ -3939,7 +3988,8 @@ static int verto_send_chat(const char *uid, const char *call_id, cJSON *msg)
for(jsock = profile->jsock_head; jsock; jsock = jsock->next) { for(jsock = profile->jsock_head; jsock; jsock = jsock->next) {
if (!strcmp(uid, jsock->uid)) { if (!strcmp(uid, jsock->uid)) {
ws_write_json(jsock, &msg, SWITCH_FALSE); //ws_write_json(jsock, &msg, SWITCH_FALSE);
jsock_queue_event(jsock, &msg);
hits++; hits++;
} }
} }
...@@ -4351,6 +4401,10 @@ static void presence_event_handler(switch_event_t *event) ...@@ -4351,6 +4401,10 @@ static void presence_event_handler(switch_event_t *event)
char *event_channel; char *event_channel;
const char *presence_id = switch_event_get_header(event, "channel-presence-id"); const char *presence_id = switch_event_get_header(event, "channel-presence-id");
if (!globals.running) {
return;
}
if (!globals.enable_presence || zstr(presence_id)) { if (!globals.enable_presence || zstr(presence_id)) {
return; return;
} }
...@@ -4388,6 +4442,38 @@ static void presence_event_handler(switch_event_t *event) ...@@ -4388,6 +4442,38 @@ static void presence_event_handler(switch_event_t *event)
} }
static void event_handler(switch_event_t *event)
{
cJSON *msg = NULL, *data = NULL;
char *event_channel;
if (!globals.enable_fs_events) {
return;
}
switch_event_serialize_json_obj(event, &data);
msg = cJSON_CreateObject();
if (event->event_id == SWITCH_EVENT_CUSTOM) {
const char *subclass = switch_event_get_header(event, "Event-Subclass");
event_channel = switch_mprintf("FSevent.%s::%s", switch_event_name(event->event_id), subclass);
switch_tolower_max(event_channel + 8);
} else {
event_channel = switch_mprintf("FSevent.%s", switch_event_name(event->event_id));
switch_tolower_max(event_channel + 8);
}
cJSON_AddItemToObject(msg, "eventChannel", cJSON_CreateString(event_channel));
cJSON_AddItemToObject(msg, "data", data);
/* comment broadcasting globally and change to only within the module cos FS events are heavy */
//switch_event_channel_broadcast(event_channel, &msg, __FILE__, NO_EVENT_CHANNEL_ID);
verto_broadcast(event_channel, msg, __FILE__, NO_EVENT_CHANNEL_ID);cJSON_Delete(msg);
free(event_channel);
}
/* Macro expands to: switch_status_t mod_verto_load(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) */ /* Macro expands to: switch_status_t mod_verto_load(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) */
SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
...@@ -4403,6 +4489,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) ...@@ -4403,6 +4489,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
globals.pool = pool; globals.pool = pool;
globals.ready = SIGUSR1; globals.ready = SIGUSR1;
globals.enable_presence = SWITCH_TRUE; globals.enable_presence = SWITCH_TRUE;
globals.enable_fs_events = SWITCH_FALSE;
switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
...@@ -4423,8 +4510,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) ...@@ -4423,8 +4510,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
globals.detach_timeout = 120; globals.detach_timeout = 120;
switch_event_bind(modname, SWITCH_EVENT_CHANNEL_CALLSTATE, SWITCH_EVENT_SUBCLASS_ANY, presence_event_handler, NULL);
memset(&json_GLOBALS, 0, sizeof(json_GLOBALS)); memset(&json_GLOBALS, 0, sizeof(json_GLOBALS));
...@@ -4462,6 +4547,17 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) ...@@ -4462,6 +4547,17 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
switch_core_register_secondary_recover_callback(modname, verto_recover_callback); switch_core_register_secondary_recover_callback(modname, verto_recover_callback);
if (globals.enable_presence) {
switch_event_bind(modname, SWITCH_EVENT_CHANNEL_CALLSTATE, SWITCH_EVENT_SUBCLASS_ANY, presence_event_handler, NULL);
}
if (globals.enable_fs_events) {
if (switch_event_bind(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
return SWITCH_STATUS_GENERR;
}
}
run_profiles(); run_profiles();
/* indicate that the module should continue to be loaded */ /* indicate that the module should continue to be loaded */
...@@ -4478,6 +4574,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_verto_shutdown) ...@@ -4478,6 +4574,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_verto_shutdown)
switch_event_channel_unbind(NULL, verto_broadcast); switch_event_channel_unbind(NULL, verto_broadcast);
switch_event_unbind_callback(presence_event_handler); switch_event_unbind_callback(presence_event_handler);
switch_event_unbind_callback(event_handler);
switch_core_unregister_secondary_recover_callback(modname); switch_core_unregister_secondary_recover_callback(modname);
do_shutdown(); do_shutdown();
......
...@@ -51,6 +51,9 @@ ...@@ -51,6 +51,9 @@
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include "mcast.h" #include "mcast.h"
#define MAX_QUEUE_LEN 100000
#define MAX_MISSED 500
#define MAXPENDING 10000 #define MAXPENDING 10000
#define STACK_SIZE 80 * 1024 #define STACK_SIZE 80 * 1024
...@@ -73,7 +76,10 @@ typedef enum { ...@@ -73,7 +76,10 @@ typedef enum {
typedef enum { typedef enum {
JPFLAG_INIT = (1 << 0), JPFLAG_INIT = (1 << 0),
JPFLAG_AUTHED = (1 << 1), JPFLAG_AUTHED = (1 << 1),
JPFLAG_CHECK_ATTACH = (1 << 2) JPFLAG_CHECK_ATTACH = (1 << 2),
JPFLAG_EVENTS = (1 << 3),
JPFLAG_AUTH_EVENTS = (1 << 4),
JPFLAG_ALL_EVENTS_AUTHED = (1 << 5)
} jpflag_t; } jpflag_t;
struct verto_profile_s; struct verto_profile_s;
...@@ -114,10 +120,14 @@ struct jsock_s { ...@@ -114,10 +120,14 @@ struct jsock_s {
switch_thread_rwlock_t *rwlock; switch_thread_rwlock_t *rwlock;
switch_mutex_t *write_mutex; switch_mutex_t *write_mutex;
switch_mutex_t *filter_mutex;
switch_event_t *params; switch_event_t *params;
switch_event_t *vars; switch_event_t *vars;
switch_queue_t *event_queue;
int lost_events;
struct jsock_s *next; struct jsock_s *next;
}; };
...@@ -231,6 +241,7 @@ struct globals_s { ...@@ -231,6 +241,7 @@ struct globals_s {
int ready; int ready;
int profile_threads; int profile_threads;
int enable_presence; int enable_presence;
int enable_fs_events;
switch_hash_t *jsock_hash; switch_hash_t *jsock_hash;
switch_mutex_t *jsock_mutex; switch_mutex_t *jsock_mutex;
......
...@@ -2491,6 +2491,178 @@ SWITCH_DECLARE(char *) switch_util_quote_shell_arg_pool(const char *string, swit ...@@ -2491,6 +2491,178 @@ SWITCH_DECLARE(char *) switch_util_quote_shell_arg_pool(const char *string, swit
return dest; return dest;
} }
#ifdef HAVE_POLL
#include <poll.h>
SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switch_poll_t flags)
{
struct pollfd pfds[2] = { { 0 } };
int s = 0, r = 0;
if (sock == SWITCH_SOCK_INVALID) {
return SWITCH_SOCK_INVALID;
}
pfds[0].fd = sock;
if ((flags & SWITCH_POLL_READ)) {
pfds[0].events |= POLLIN;
}
if ((flags & SWITCH_POLL_WRITE)) {
pfds[0].events |= POLLOUT;
}
if ((flags & SWITCH_POLL_ERROR)) {
pfds[0].events |= POLLERR;
}
if ((flags & SWITCH_POLL_HUP)) {
pfds[0].events |= POLLHUP;
}
if ((flags & SWITCH_POLL_RDNORM)) {
pfds[0].events |= POLLRDNORM;
}
if ((flags & SWITCH_POLL_RDBAND)) {
pfds[0].events |= POLLRDBAND;
}
if ((flags & SWITCH_POLL_PRI)) {
pfds[0].events |= POLLPRI;
}
s = poll(pfds, 1, ms);
if (s < 0) {
r = s;
} else if (s > 0) {
if ((pfds[0].revents & POLLIN)) {
r |= SWITCH_POLL_READ;
}
if ((pfds[0].revents & POLLOUT)) {
r |= SWITCH_POLL_WRITE;
}
if ((pfds[0].revents & POLLERR)) {
r |= SWITCH_POLL_ERROR;
}
if ((pfds[0].revents & POLLHUP)) {
r |= SWITCH_POLL_HUP;
}
if ((pfds[0].revents & POLLRDNORM)) {
r |= SWITCH_POLL_RDNORM;
}
if ((pfds[0].revents & POLLRDBAND)) {
r |= SWITCH_POLL_RDBAND;
}
if ((pfds[0].revents & POLLPRI)) {
r |= SWITCH_POLL_PRI;
}
if ((pfds[0].revents & POLLNVAL)) {
r |= SWITCH_POLL_INVALID;
}
}
return r;
}
#else
/* use select instead of poll */
SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switch_poll_t flags)
{
int s = 0, r = 0;
fd_set *rfds;
fd_set *wfds;
fd_set *efds;
struct timeval tv;
if (sock == SWITCH_SOCK_INVALID) {
return SWITCH_SOCK_INVALID;
}
rfds = malloc(sizeof(fd_set));
wfds = malloc(sizeof(fd_set));
efds = malloc(sizeof(fd_set));
FD_ZERO(rfds);
FD_ZERO(wfds);
FD_ZERO(efds);
#ifndef WIN32
/* Wouldn't you rather know?? */
assert(sock <= FD_SETSIZE);
#endif
if ((flags & SWITCH_POLL_READ)) {
#ifdef WIN32
#pragma warning( push )
#pragma warning( disable : 4127 )
FD_SET(sock, rfds);
#pragma warning( pop )
#else
FD_SET(sock, rfds);
#endif
}
if ((flags & SWITCH_POLL_WRITE)) {
#ifdef WIN32
#pragma warning( push )
#pragma warning( disable : 4127 )
FD_SET(sock, wfds);
#pragma warning( pop )
#else
FD_SET(sock, wfds);
#endif
}
if ((flags & SWITCH_POLL_ERROR)) {
#ifdef WIN32
#pragma warning( push )
#pragma warning( disable : 4127 )
FD_SET(sock, efds);
#pragma warning( pop )
#else
FD_SET(sock, efds);
#endif
}
tv.tv_sec = ms / 1000;
tv.tv_usec = (ms % 1000) * ms;
s = select(sock + 1, (flags & SWITCH_POLL_READ) ? rfds : NULL, (flags & SWITCH_POLL_WRITE) ? wfds : NULL, (flags & SWITCH_POLL_ERROR) ? efds : NULL, &tv);
if (s < 0) {
r = s;
} else if (s > 0) {
if ((flags & SWITCH_POLL_READ) && FD_ISSET(sock, rfds)) {
r |= SWITCH_POLL_READ;
}
if ((flags & SWITCH_POLL_WRITE) && FD_ISSET(sock, wfds)) {
r |= SWITCH_POLL_WRITE;
}
if ((flags & SWITCH_POLL_ERROR) && FD_ISSET(sock, efds)) {
r |= SWITCH_POLL_ERROR;
}
}
free(rfds);
free(wfds);
free(efds);
return r;
}
#endif
SWITCH_DECLARE(int) switch_socket_waitfor(switch_pollfd_t *poll, int ms) SWITCH_DECLARE(int) switch_socket_waitfor(switch_pollfd_t *poll, int ms)
{ {
int nsds = 0; int nsds = 0;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论