Skip to content
项目
群组
代码片段
帮助
正在加载...
登录
切换导航
F
freeswitch
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
张华
freeswitch
Commits
7e2d375d
提交
7e2d375d
authored
2月 10, 2017
作者:
Shane Bryldt
提交者:
Mike Jerris
3月 22, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
FS-9952: Added envelope to queue sending messages, prepping for initial server transport testing
上级
fa6a4e76
隐藏空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
132 行增加
和
85 行删除
+132
-85
blade_connection.c
libs/libblade/src/blade_connection.c
+86
-35
blade_identity.c
libs/libblade/src/blade_identity.c
+3
-0
blade_module_wss.c
libs/libblade/src/blade_module_wss.c
+28
-6
bladec.c
libs/libblade/test/bladec.c
+5
-8
bladec.cfg
libs/libblade/test/bladec.cfg
+10
-36
没有找到文件。
libs/libblade/src/blade_connection.c
浏览文件 @
7e2d375d
...
...
@@ -51,6 +51,49 @@ struct blade_connection_s {
//ks_q_t *receiving;
};
// @todo may want to make this reusable for session as it'll need to queue the same details during temporary connection loss
typedef
struct
blade_connection_sending_s
blade_connection_sending_t
;
struct
blade_connection_sending_s
{
ks_pool_t
*
pool
;
blade_identity_t
*
target
;
cJSON
*
json
;
};
ks_status_t
blade_connection_sending_create
(
blade_connection_sending_t
**
bcsP
,
ks_pool_t
*
pool
,
blade_identity_t
*
target
,
cJSON
*
json
)
{
blade_connection_sending_t
*
bcs
=
NULL
;
ks_assert
(
bcsP
);
ks_assert
(
pool
);
ks_assert
(
json
);
bcs
=
ks_pool_alloc
(
pool
,
sizeof
(
blade_connection_sending_t
));
bcs
->
pool
=
pool
;
bcs
->
target
=
target
;
bcs
->
json
=
json
;
*
bcsP
=
bcs
;
return
KS_STATUS_SUCCESS
;
}
ks_status_t
blade_connection_sending_destroy
(
blade_connection_sending_t
**
bcsP
)
{
blade_connection_sending_t
*
bcs
=
NULL
;
ks_assert
(
bcsP
);
ks_assert
(
*
bcsP
);
bcs
=
*
bcsP
;
if
(
bcs
->
target
)
blade_identity_destroy
(
&
bcs
->
target
);
if
(
bcs
->
json
)
cJSON_Delete
(
bcs
->
json
);
ks_pool_free
(
bcs
->
pool
,
bcsP
);
return
KS_STATUS_SUCCESS
;
}
void
*
blade_connection_state_thread
(
ks_thread_t
*
thread
,
void
*
data
);
...
...
@@ -74,7 +117,8 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
bc
->
transport_init_data
=
transport_init_data
;
bc
->
transport_callbacks
=
transport_callbacks
;
ks_q_create
(
&
bc
->
sending
,
pool
,
0
);
//ks_q_create(&bc->receiving, pool, 0);
ks_assert
(
bc
->
sending
);
*
bcP
=
bc
;
return
KS_STATUS_SUCCESS
;
...
...
@@ -92,7 +136,6 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
blade_connection_shutdown
(
bc
);
ks_q_destroy
(
&
bc
->
sending
);
//ks_q_destroy(&bc->receiving);
ks_pool_free
(
bc
->
pool
,
bcP
);
...
...
@@ -122,6 +165,8 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c
KS_DECLARE
(
ks_status_t
)
blade_connection_shutdown
(
blade_connection_t
*
bc
)
{
blade_connection_sending_t
*
bcs
=
NULL
;
ks_assert
(
bc
);
if
(
bc
->
state_thread
)
{
...
...
@@ -131,8 +176,7 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
bc
->
shutdown
=
KS_FALSE
;
}
//while (ks_q_trypop(bc->sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
//while (ks_q_trypop(bc->receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
while
(
ks_q_trypop
(
bc
->
sending
,
(
void
**
)
&
bcs
)
==
KS_STATUS_SUCCESS
&&
bcs
)
blade_connection_sending_destroy
(
&
bcs
);
return
KS_STATUS_SUCCESS
;
}
...
...
@@ -221,40 +265,40 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
KS_DECLARE
(
ks_status_t
)
blade_connection_sending_push
(
blade_connection_t
*
bc
,
blade_identity_t
*
target
,
cJSON
*
json
)
{
blade_connection_sending_t
*
bcs
=
NULL
;
ks_assert
(
bc
);
ks_assert
(
json
);
// @todo need internal envelope to wrap an identity object and a json object just for the queue
blade_connection_sending_create
(
&
bcs
,
bc
->
pool
,
target
,
json
);
ks_assert
(
bcs
);
return
KS_STATUS_SUCCESS
;
return
ks_q_push
(
bc
->
sending
,
bcs
)
;
}
KS_DECLARE
(
ks_status_t
)
blade_connection_sending_pop
(
blade_connection_t
*
bc
,
blade_identity_t
**
target
,
cJSON
**
json
)
{
ks_status_t
ret
=
KS_STATUS_SUCCESS
;
blade_connection_sending_t
*
bcs
=
NULL
;
ks_assert
(
bc
);
ks_assert
(
json
);
// @todo need internal envelope to wrap an identity object and a json object just for the queue
return
KS_STATUS_SUCCESS
;
}
ret
=
ks_q_trypop
(
bc
->
sending
,
(
void
**
)
&
bcs
);
// @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into
//KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
//{
// ks_assert(bc);
// ks_assert(json);
if
(
bcs
)
{
if
(
target
)
*
target
=
bcs
->
target
;
*
json
=
bcs
->
json
;
// return ks_q_push(bc->receiving, json);
//}
bcs
->
target
=
NULL
;
bcs
->
json
=
NULL
;
blade_connection_sending_destroy
(
&
bcs
);
}
return
ret
;
}
//KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
//{
// ks_assert(bc);
// ks_assert(json);
// return ks_q_trypop(bc->receiving, (void **)json);
//}
void
*
blade_connection_state_thread
(
ks_thread_t
*
thread
,
void
*
data
)
{
...
...
@@ -262,6 +306,7 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
blade_connection_state_t
state
;
blade_transport_state_callback_t
callback
=
NULL
;
blade_connection_state_hook_t
hook
=
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
blade_identity_t
*
target
=
NULL
;
cJSON
*
json
=
NULL
;
ks_assert
(
thread
);
...
...
@@ -270,22 +315,28 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
bc
=
(
blade_connection_t
*
)
data
;
while
(
!
bc
->
shutdown
)
{
// @todo pop from connection sending queue and call transport callback to write one message (passing target identity too)
// and delete the cJSON object here after returning from callback
// @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions
state
=
bc
->
state
;
hook
=
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
callback
=
blade_connection_state_callback_lookup
(
bc
,
state
);
// @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really
// need to do anything
if
(
state
==
BLADE_CONNECTION_STATE_READY
&&
bc
->
transport_callbacks
->
onreceive
(
bc
,
&
json
)
==
KS_STATUS_SUCCESS
&&
json
)
{
// @todo push json to session receiving queue
while
(
blade_connection_sending_pop
(
bc
,
&
target
,
&
json
)
==
KS_STATUS_SUCCESS
&&
json
)
{
if
(
bc
->
transport_callbacks
->
onsend
(
bc
,
target
,
json
)
!=
KS_STATUS_SUCCESS
)
{
blade_connection_disconnect
(
bc
);
break
;
}
}
if
(
state
==
BLADE_CONNECTION_STATE_READY
)
{
do
{
if
(
bc
->
transport_callbacks
->
onreceive
(
bc
,
&
json
)
!=
KS_STATUS_SUCCESS
)
{
blade_connection_disconnect
(
bc
);
break
;
}
if
(
json
)
{
// @todo push json to session receiving queue
}
}
while
(
json
)
;
}
if
(
callback
)
hook
=
callback
(
bc
,
BLADE_CONNECTION_STATE_CONDITION_POST
);
...
...
libs/libblade/src/blade_identity.c
浏览文件 @
7e2d375d
...
...
@@ -75,6 +75,9 @@ KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *u
ks_assert
(
uri
);
if
(
bi
->
uri
)
ks_pool_free
(
bi
->
pool
,
&
bi
->
uri
);
bi
->
uri
=
ks_pstrdup
(
bi
->
pool
,
uri
);
// @todo parse into components
return
KS_STATUS_SUCCESS
;
}
...
...
libs/libblade/src/blade_module_wss.c
浏览文件 @
7e2d375d
...
...
@@ -342,6 +342,8 @@ ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *co
bm_wss
=
(
blade_module_wss_t
*
)
blade_module_data_get
(
bm
);
// @todo register wss transport to the blade_handle_t
if
(
blade_module_wss_config
(
bm_wss
,
config
)
!=
KS_STATUS_SUCCESS
)
{
ks_log
(
KS_LOG_DEBUG
,
"blade_module_wss_config failed
\n
"
);
return
KS_STATUS_FAIL
;
...
...
@@ -381,6 +383,8 @@ ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm)
bm_wss
=
(
blade_module_wss_t
*
)
blade_module_data_get
(
bm
);
// @todo unregister wss transport from the blade_handle_t
if
(
bm_wss
->
listeners_thread
)
{
bm_wss
->
shutdown
=
KS_TRUE
;
ks_thread_join
(
bm_wss
->
listeners_thread
);
...
...
@@ -396,6 +400,9 @@ ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm)
bm_wss
->
listeners_count
=
0
;
if
(
bm_wss
->
listeners_poll
)
ks_pool_free
(
bm_wss
->
pool
,
&
bm_wss
->
listeners_poll
);
// @todo connections should be gracefully disconnected so that they detach from sessions properly
// which means this should occur before the listeners thread is terminated, which requires that
// the listener sockets be made inactive (or closed) to stop accepting while shutting down
while
(
ks_q_trypop
(
bm_wss
->
disconnected
,
(
void
**
)
&
bc
)
==
KS_STATUS_SUCCESS
)
;
list_iterator_start
(
&
bm_wss
->
connected
);
while
(
list_iterator_hasnext
(
&
bm_wss
->
connected
))
{
...
...
@@ -581,22 +588,30 @@ blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blad
ks_status_t
blade_transport_wss_write
(
blade_transport_wss_t
*
bt_wss
,
cJSON
*
json
)
{
ks_status_t
ret
=
KS_STATUS_SUCCESS
;
char
*
json_str
=
cJSON_PrintUnformatted
(
json
);
ks_size_t
json_str_len
=
0
;
if
(
!
json_str
)
{
// @todo error logging
return
KS_STATUS_FAIL
;
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
json_str_len
=
strlen
(
json_str
)
+
1
;
// @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
kws_write_frame
(
bt_wss
->
kws
,
WSOC_TEXT
,
json_str
,
json_str_len
);
if
(
kws_write_frame
(
bt_wss
->
kws
,
WSOC_TEXT
,
json_str
,
json_str_len
)
!=
json_str_len
)
{
// @todo error logging
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
free
(
json_str
);
done:
if
(
json_str
)
free
(
json_str
);
return
KS_STATUS_SUCCESS
;
return
ret
;
}
ks_status_t
blade_transport_wss_on_send
(
blade_connection_t
*
bc
,
blade_identity_t
*
target
,
cJSON
*
json
)
{
ks_status_t
ret
=
KS_STATUS_SUCCESS
;
blade_transport_wss_t
*
bt_wss
=
NULL
;
ks_assert
(
bc
);
...
...
@@ -606,7 +621,13 @@ ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t
bt_wss
=
(
blade_transport_wss_t
*
)
blade_connection_transport_get
(
bc
);
return
blade_transport_wss_write
(
bt_wss
,
json
);
ret
=
blade_transport_wss_write
(
bt_wss
,
json
);
// @todo use reference counting on blade_identity_t and cJSON objects
if
(
target
)
blade_identity_destroy
(
&
target
);
cJSON_Delete
(
json
);
return
ret
;
}
ks_status_t
blade_transport_wss_read
(
blade_transport_wss_t
*
bt_wss
,
cJSON
**
json
)
...
...
@@ -743,7 +764,8 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
// @todo Establish sessid and discover existing session or create and register new session through BLADE commands
// Set session state to CONNECT if its new or RECONNECT if existing
// start session and its thread if its new
ks_sleep_ms
(
1000
);
// @todo temporary testing, remove this and return success once negotiations are done
return
BLADE_CONNECTION_STATE_HOOK_BYPASS
;
}
...
...
libs/libblade/test/bladec.c
浏览文件 @
7e2d375d
...
...
@@ -16,7 +16,6 @@ char g_console_input[CONSOLE_INPUT_MAX];
size_t
g_console_input_length
=
0
;
size_t
g_console_input_eol
=
0
;
void
service_peer_state_callback
(
blade_service_t
*
service
,
blade_peer_t
*
peer
,
blade_peerstate_t
state
);
void
loop
(
blade_handle_t
*
bh
);
void
process_console_input
(
blade_handle_t
*
bh
,
char
*
line
);
...
...
@@ -71,11 +70,13 @@ int main(int argc, char **argv)
return
EXIT_FAILURE
;
}
if
(
blade_handle_startup
(
bh
,
config_blade
,
service_peer_state_callback
)
!=
KS_STATUS_SUCCESS
)
{
if
(
blade_handle_startup
(
bh
,
config_blade
)
!=
KS_STATUS_SUCCESS
)
{
ks_log
(
KS_LOG_ERROR
,
"Blade startup failed
\n
"
);
return
EXIT_FAILURE
;
}
// @todo get to wss module callbacks, call onload to kick off registration
loop
(
bh
);
blade_handle_destroy
(
&
bh
);
...
...
@@ -85,12 +86,8 @@ int main(int argc, char **argv)
return
0
;
}
void
service_peer_state_callback
(
blade_service_t
*
service
,
blade_peer_t
*
peer
,
blade_peerstate_t
state
)
{
// @todo log output and pop peer messages if state == BLADE_PEERSTATE_RECEIVING
ks_log
(
KS_LOG_INFO
,
"service peer state callback: %d
\n
"
,
(
int
)
state
);
}
void
buffer_console_input
(
void
)
{
ssize_t
bytes
=
0
;
...
...
libs/libblade/test/bladec.cfg
浏览文件 @
7e2d375d
blade:
{
# client stuff, for peers who connect out to services
client:
{
directory:
{
# todo: hints for ways to find a directory service, at least kws client_data for now
# add DNS SRV in the future
uri = "???:127.0.0.1+2100:???"; # todo: confirm expected format, "uri:host:proto"
websocket:
{
# SSL group is optional, disabled when absent
ssl:
{
# todo: client SSL stuffs here
};
};
};
};
# server stuff, for services that peers connect to
# todo: consider encapsulating in a "server" group for organizational structure
datastore:
{
database:
...
...
@@ -30,21 +7,18 @@ blade:
path = ":mem:";
};
};
service
:
wss
:
{
websockets:
endpoints:
{
ipv4 = ( { address = "0.0.0.0", port = 2100 } );
ipv6 = ( { address = "::", port = 2100 } );
backlog = 128;
};
# SSL group is optional, disabled when absent
ssl:
{
endpoints:
{
ipv4 = ( { address = "0.0.0.0", port = 2100 } );
ipv6 = ( { address = "::", port = 2100 } );
backlog = 128;
};
# SSL group is optional, disabled when absent
ssl:
{
# todo: service SSL stuffs here
};
# todo: server SSL stuffs here
};
};
};
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论