提交 828d6eaf authored 作者: Anthony Minessale's avatar Anthony Minessale

first pass, add some funcs to conference and speed test features and fix bugs in…

first pass, add some funcs to conference and speed test features and fix bugs in ws.c for big payloads
上级 91e86ab3
......@@ -91,6 +91,19 @@
/// The next JSON-RPC request id.
$.JsonRpcClient.prototype._current_id = 1;
$.JsonRpcClient.prototype.speedTest = function (bytes, cb) {
var socket = this.options.getSocket(this.wsOnMessage);
if (socket !== null) {
this.speedCB = cb;
this.speedBytes = bytes;
socket.send("#SPU");
socket.send("#SPB\n" + new Array(bytes).join("."));
}
};
/**
* @fn call
* @memberof $.JsonRpcClient
......@@ -382,6 +395,26 @@
$.JsonRpcClient.prototype._wsOnMessage = function(event) {
// Check if this could be a JSON RPC message.
var response;
// Special sub proto
if (event.data[0] == "#" && event.data[1] == "S" && event.data[2] == "P") {
if (event.data[3] == "U") {
this.up_dur = parseInt(event.data.substring(4));
} else if (this.speedCB && event.data[3] == "D") {
this.down_dur = parseInt(event.data.substring(4));
var up_kps = (((this.speedBytes * 8) / (this.up_dur / 1000)) / 1024).toFixed(0);
var down_kps = (((this.speedBytes * 8) / (this.down_dur / 1000)) / 1024).toFixed(0);
console.info("Speed Test: Up: " + up_kps + " Down: " + down_kps);
this.speedCB(event, { upDur: this.up_dur, downDur: this.down_dur, upKPS: up_kps, downKPS: down_kps });
this.speedCB = null;
}
return;
}
try {
response = $.parseJSON(event.data);
......
......@@ -91,7 +91,7 @@ div#preload { display: none; }
<body>
<div data-role="page" id="page-login" align="center">
<div data-role="header" class="page-header">
FreeSWITCH Verto&trade; Video Transcoding Demo
FreeSWITCH Verto&trade; Video Transcoding Demo
</div>
</div>
......@@ -108,6 +108,18 @@ div#preload { display: none; }
</div>
<div data-role="page" id="page-bwtest" align="center">
<div data-role="header" class="page-header">
FreeSWITCH Verto&trade; Testing Network Connection
</div>
<h1>Testing Network Connection</h1>
<img src="images/speed.gif"/>
</div>
<div data-role="page" id="page-incall" align="center">
<div data-role="header" id="calltitle" class="pageheader">
Verto&trade; IN CALL
......@@ -248,7 +260,7 @@ div#preload { display: none; }
<div data-role="page" id="page-main" align="center">
<div data-role="header" class="pageheader">
FreeSWITCH Verto&trade; Video Transcoding Demo
FreeSWITCH Verto&trade; Video Transcoding Demo (<span id="bwinfo">*checking*</span>)
</div>
<br>
<center> <table width="1024" border="0">
......@@ -347,7 +359,6 @@ if ($('#devices').is(':visible')) {
}
</script>
<!--<button data-inline="true" id="showdemo" onclick="toggle_demo();">View Demo Extensions</button>-->
<button data-inline="true" id="showdevices" onclick="toggle_device();">View Device Settings</button>
<button data-inline="true"id="logoutbtn">Log Out</button>
......
This diff was suppressed by a .gitattributes entry.
......@@ -559,8 +559,25 @@ var callbacks = {
ringing = false;
if (success) {
online(true);
vertoHandle.rpcClient.speedTest(1024 * 256, function(e, obj) {
//console.error("Up: " + obj.upKPS, "Down: ", obj.downKPS);
if (outgoingBandwidth === "default") {
outgoingBandwidth = obj.upKPS * .75;
}
if (incomingBandwidth === "default") {
incomingBandwidth = obj.downKPS * .75;
}
//console.error(outgoingBandwidth, incomingBandwidth);
$("#bwinfo").html("<b>Bandwidth: " + "Up: " + obj.upKPS + " Down: " + obj.downKPS + "</b>");
online(true);
goto_page("main");
$("input[type='radio']").checkboxradio("refresh");
$("input[type='checkbox']").checkboxradio("refresh");
});
/*
verto.subscribe("presence", {
handler: function(v, e) {
......@@ -993,8 +1010,10 @@ function refresh_devices()
$("#useshare").selectmenu('refresh', true);
//$("input[type='radio']).checkboxradio({});
$("input[type='radio']").checkboxradio("refresh");
$("input[type='checkbox']").checkboxradio("refresh");
//$("input[type='radio']").checkboxradio("refresh");
//$("input[type='checkbox']").checkboxradio("refresh");
//console.error($("#usecamera").find(":selected").val());
//$.FSRTC.getValidRes($("#usecamera").find(":selected").val(), undefined);
......@@ -1021,7 +1040,7 @@ function refresh_devices()
function init() {
cur_call = null;
goto_page("main");
goto_page("bwtest");
$("#usecamera").selectmenu({});
$("#usemic").selectmenu({});
......
......@@ -1828,13 +1828,66 @@ void conference_video_pop_next_image(conference_member_t *member, switch_image_t
*imgP = img;
}
void conference_video_set_incoming_bitrate(conference_member_t *member, int kps)
{
switch_core_session_message_t msg = { 0 };
msg.message_id = SWITCH_MESSAGE_INDICATE_BITRATE_REQ;
msg.numeric_arg = kps * 1024;
msg.from = __FILE__;
switch_core_session_receive_message(member->session, &msg);
member->managed_kps = kps;
}
void conference_video_set_max_incoming_bitrate_member(conference_member_t *member, int kps)
{
member->max_bw_in = kps;
member->managed_kps = 0;
}
void conference_video_set_absolute_incoming_bitrate_member(conference_member_t *member, int kps)
{
member->max_bw_in = 0;
member->force_bw_in = kps;
member->managed_kps = 0;
if (!conference_utils_test_flag(member->conference, CFLAG_MANAGE_INBOUND_VIDEO_BITRATE) && switch_channel_test_flag(member->channel, CF_VIDEO)) {
conference_video_set_incoming_bitrate(member, kps);
}
}
void conference_video_set_max_incoming_bitrate(conference_obj_t *conference, int kps)
{
conference_member_t *imember;
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
if (imember->channel && switch_channel_ready(imember->channel) && conference_utils_member_test_flag(imember, MFLAG_RUNNING)) {
conference_video_set_max_incoming_bitrate_member(imember, kps);
}
}
switch_mutex_unlock(conference->member_mutex);
}
void conference_video_set_absolute_incoming_bitrate(conference_obj_t *conference, int kps)
{
conference_member_t *imember;
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
if (imember->channel && switch_channel_ready(imember->channel) && conference_utils_member_test_flag(imember, MFLAG_RUNNING)) {
conference_video_set_absolute_incoming_bitrate_member(imember, kps);
}
}
switch_mutex_unlock(conference->member_mutex);
}
void conference_video_check_auto_bitrate(conference_member_t *member, mcu_layer_t *layer)
{
if (switch_channel_test_flag(member->channel, CF_VIDEO_BITRATE_UNMANAGABLE)) {
member->managed_kps = 0;
} else if (conference_utils_test_flag(member->conference, CFLAG_MANAGE_INBOUND_VIDEO_BITRATE) && !member->managed_kps) {
switch_core_session_message_t msg = { 0 };
int kps;
int w = 320;
int h = 240;
......@@ -1845,23 +1898,40 @@ void conference_video_check_auto_bitrate(conference_member_t *member, mcu_layer_
h = layer->screen_h;
}
}
if (!layer || !conference_utils_member_test_flag(member, MFLAG_CAN_BE_SEEN) || member->avatar_png_img) {
kps = 200;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s auto-setting bitrate to %dkps because user's image is not visible\n",
if (member->conference->force_bw_in || member->force_bw_in) {
if (!(kps = member->conference->force_bw_in)) {
kps = member->force_bw_in;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s setting bitrate to %dkps because it was forced.\n",
switch_channel_get_name(member->channel), kps);
} else {
kps = switch_calc_bitrate(w, h, 2, (int)(member->conference->video_fps.fps));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s auto-setting bitrate to %dkps to accomodate %dx%d resolution\n",
switch_channel_get_name(member->channel), kps, layer->screen_w, layer->screen_h);
}
int max = 0;
msg.message_id = SWITCH_MESSAGE_INDICATE_BITRATE_REQ;
msg.numeric_arg = kps * 1024;
msg.from = __FILE__;
if (!layer || !conference_utils_member_test_flag(member, MFLAG_CAN_BE_SEEN) || member->avatar_png_img) {
kps = 256;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s auto-setting bitrate to %dkps because user's image is not visible\n",
switch_channel_get_name(member->channel), kps);
} else {
kps = switch_calc_bitrate(w, h, 1, (int)(member->conference->video_fps.fps));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s auto-setting bitrate to %dkps to accomodate %dx%d resolution\n",
switch_channel_get_name(member->channel), kps, layer->screen_w, layer->screen_h);
}
if (member->conference->max_bw_in) {
max = member->conference->max_bw_in;
} else {
max = member->max_bw_in;
}
if (max && kps > max) {
kps = max;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "%s overriding bitrate setting to %dkps because it was the max allowed.\n",
switch_channel_get_name(member->channel), kps);
}
}
switch_core_session_receive_message(member->session, &msg);
member->managed_kps = kps;
conference_video_set_incoming_bitrate(member, kps);
}
}
......
......@@ -2862,7 +2862,7 @@ conference_obj_t *conference_new(char *name, conference_xml_cfg_t cfg, switch_co
if (video_codec_bandwidth) {
if (!strcasecmp(video_codec_bandwidth, "auto")) {
conference->video_codec_settings.video.bandwidth = switch_calc_bitrate(canvas_w, canvas_h, 2, (int)conference->video_fps.fps);
conference->video_codec_settings.video.bandwidth = switch_calc_bitrate(canvas_w, canvas_h, 1, (int)conference->video_fps.fps);
} else {
conference->video_codec_settings.video.bandwidth = switch_parse_bandwidth_string(video_codec_bandwidth);
}
......
......@@ -657,6 +657,8 @@ typedef struct conference_obj {
int recording_members;
uint32_t video_floor_packets;
video_layout_t *new_personal_vlayout;
int max_bw_in;
int force_bw_in;
} conference_obj_t;
/* Relationship with another member */
......@@ -757,6 +759,8 @@ struct conference_member {
switch_media_flow_t video_media_flow;
mcu_canvas_t *canvas;
switch_image_t *pcanvas_img;
int max_bw_in;
int force_bw_in;
};
typedef enum {
......
......@@ -1844,15 +1844,50 @@ static void client_run(jsock_t *jsock)
switch_ssize_t bytes;
ws_opcode_t oc;
uint8_t *data;
bytes = ws_read_frame(&jsock->ws, &oc, &data);
if (bytes < 0) {
die("BAD READ %" SWITCH_SSIZE_T_FMT "\n", bytes);
break;
}
if (bytes) {
char *s = (char *) data;
if (*s == '#') {
char repl[80] = "", *s = (char *) data;
switch_time_t a, b;
if (s[1] == 'S' && s[2] == 'P') {
if (s[3] == 'U') {
a = switch_time_now();
bytes = ws_read_frame(&jsock->ws, &oc, &data);
b = switch_time_now();
if (!bytes || !data) continue;
switch_snprintf(repl, sizeof(repl), "#SPU %ld", (b - a) / 1000);
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, strlen(repl));
s = (char *) data;
s[3] = 'B';
a = switch_time_now();
ws_write_frame(&jsock->ws, WSOC_TEXT, data, bytes);
b = switch_time_now();
switch_snprintf(repl, sizeof(repl), "#SPD %ld", (b - a) / 1000);
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, strlen(repl));
}
}
continue;
}
if (process_input(jsock, data, bytes) != SWITCH_STATUS_SUCCESS) {
die("Input Error\n");
}
......
......@@ -264,7 +264,7 @@ int ws_handshake(wsh_t *wsh)
}
}
if (bytes > sizeof(wsh->buffer) -1) {
if (bytes > wsh->buflen -1) {
goto err;
}
......@@ -362,7 +362,7 @@ ssize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes, int block)
}
}
} while (r == -1 && err == SSL_ERROR_WANT_READ && wsh->x < 100);
} while (r == -1 && err == SSL_ERROR_WANT_READ && wsh->x < 1000);
goto end;
}
......@@ -382,7 +382,7 @@ ssize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes, int block)
ms_sleep(10);
}
}
} while (r == -1 && xp_is_blocking(xp_errno()) && wsh->x < 100);
} while (r == -1 && xp_is_blocking(xp_errno()) && wsh->x < 1000);
if (wsh->x >= 1000 || (block && wsh->x >= 100)) {
r = -1;
......@@ -596,7 +596,15 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock, int
wsh->close_sock = 1;
}
wsh->buflen = sizeof(wsh->buffer);
wsh->buflen = 1024 * 64;
wsh->bbuflen = wsh->buflen;
wsh->buffer = malloc(wsh->buflen);
wsh->bbuffer = malloc(wsh->bbuflen);
//printf("init %p %ld\n", (void *) wsh->bbuffer, wsh->bbuflen);
//memset(wsh->buffer, 0, wsh->buflen);
//memset(wsh->bbuffer, 0, wsh->bbuflen);
wsh->secure = ssl_ctx ? 1 : 0;
setup_socket(sock);
......@@ -644,6 +652,12 @@ void ws_destroy(wsh_t *wsh)
SSL_free(wsh->ssl);
wsh->ssl = NULL;
}
if (wsh->buffer) free(wsh->buffer);
if (wsh->bbuffer) free(wsh->bbuffer);
wsh->buffer = wsh->bbuffer = NULL;
}
ssize_t ws_close(wsh_t *wsh, int16_t reason)
......@@ -685,6 +699,20 @@ ssize_t ws_close(wsh_t *wsh, int16_t reason)
}
uint64_t hton64(uint64_t val)
{
if (__BYTE_ORDER == __BIG_ENDIAN) return (val);
else return __bswap_64(val);
}
uint64_t ntoh64(uint64_t val)
{
if (__BYTE_ORDER == __BIG_ENDIAN) return (val);
else return __bswap_64(val);
}
ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
{
......@@ -692,6 +720,10 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
char *maskp;
int ll = 0;
int frag = 0;
int blen;
wsh->body = wsh->bbuffer;
wsh->packetlen = 0;
again:
need = 2;
......@@ -745,12 +777,11 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
int fin = (wsh->buffer[0] >> 7) & 1;
int mask = (wsh->buffer[1] >> 7) & 1;
if (fin) {
if (*oc == WSOC_CONTINUATION) {
frag = 1;
} else {
frag = 0;
}
if (!fin && *oc != WSOC_CONTINUATION) {
frag = 1;
} else if (fin && *oc == WSOC_CONTINUATION) {
frag = 0;
}
if (mask) {
......@@ -765,23 +796,33 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
wsh->plen = wsh->buffer[1] & 0x7f;
wsh->payload = &wsh->buffer[2];
if (wsh->plen == 127) {
uint64_t *u64;
int more = 0;
need += 8;
if (need > wsh->datalen) {
/* too small - protocol err */
*oc = WSOC_CLOSE;
return ws_close(wsh, WS_PROTO_ERR);
}
//*oc = WSOC_CLOSE;
//return ws_close(wsh, WS_PROTO_ERR);
u64 = (uint64_t *) wsh->payload;
wsh->payload += 8;
more = ws_raw_read(wsh, wsh->buffer + wsh->datalen, need - wsh->datalen, WS_BLOCK);
if (more < need - wsh->datalen) {
*oc = WSOC_CLOSE;
return ws_close(wsh, WS_PROTO_ERR);
} else {
wsh->datalen += more;
}
wsh->plen = ntohl((u_long)*u64);
}
u64 = (uint64_t *) wsh->payload;
wsh->payload += 8;
wsh->plen = ntoh64(*u64);
} else if (wsh->plen == 126) {
uint16_t *u16;
......@@ -811,16 +852,30 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
return ws_close(wsh, WS_PROTO_ERR);
}
if ((need + wsh->datalen) > (ssize_t)wsh->buflen) {
/* too big - Ain't nobody got time fo' dat */
*oc = WSOC_CLOSE;
return ws_close(wsh, WS_DATA_TOO_BIG);
blen = wsh->body - wsh->bbuffer;
if (need + blen > (ssize_t)wsh->bbuflen) {
void *tmp;
wsh->bbuflen = need + blen + wsh->rplen;
if ((tmp = realloc(wsh->bbuffer, wsh->bbuflen))) {
wsh->bbuffer = tmp;
} else {
abort();
}
wsh->body = wsh->bbuffer + blen;
}
wsh->rplen = wsh->plen - need;
if (wsh->rplen) {
memcpy(wsh->body, wsh->payload, wsh->rplen);
}
while(need) {
ssize_t r = ws_raw_read(wsh, wsh->payload + wsh->rplen, need, WS_BLOCK);
ssize_t r = ws_raw_read(wsh, wsh->body + wsh->rplen, need, WS_BLOCK);
if (r < 1) {
/* invalid read - protocol err .. */
......@@ -837,28 +892,30 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
ssize_t i;
for (i = 0; i < wsh->datalen; i++) {
wsh->payload[i] ^= maskp[i % 4];
wsh->body[i] ^= maskp[i % 4];
}
}
if (*oc == WSOC_PING) {
ws_write_frame(wsh, WSOC_PONG, wsh->payload, wsh->rplen);
ws_write_frame(wsh, WSOC_PONG, wsh->body, wsh->rplen);
goto again;
}
*(wsh->body+wsh->rplen) = '\0';
wsh->packetlen += wsh->rplen;
wsh->body += wsh->rplen;
if (frag) {
goto again;
}
*(wsh->payload+wsh->rplen) = '\0';
*data = (uint8_t *)wsh->payload;
//printf("READ[%ld][%d]-----------------------------:\n[%s]\n-------------------------------\n", wsh->rplen, *oc, (char *)*data);
*data = (uint8_t *)wsh->bbuffer;
//printf("READ[%ld][%d]-----------------------------:\n[%s]\n-------------------------------\n", wsh->packetlen, *oc, (char *)*data);
return wsh->rplen;
return wsh->packetlen;
}
break;
default:
......@@ -871,6 +928,7 @@ ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
}
}
#if 0
ssize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes)
{
......@@ -885,6 +943,7 @@ ssize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes)
return bytes;
}
ssize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc)
{
ssize_t r = 0;
......@@ -899,7 +958,7 @@ ssize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc)
return r;
}
#endif
ssize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes)
{
......@@ -934,7 +993,7 @@ ssize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes)
hlen += 8;
u64 = (uint64_t *) &hdr[2];
*u64 = htonl(bytes);
*u64 = hton64(bytes);
}
if (wsh->write_buffer_len < (hlen + bytes + 1)) {
......
......@@ -78,15 +78,18 @@ typedef enum {
typedef struct wsh_s {
ws_socket_t sock;
char buffer[65536];
char wbuffer[65536];
char *buffer;
char *bbuffer;
char *body;
char *uri;
size_t buflen;
size_t bbuflen;
ssize_t datalen;
ssize_t wdatalen;
char *payload;
ssize_t plen;
ssize_t rplen;
ssize_t packetlen;
SSL *ssl;
int handshake;
uint8_t down;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论