提交 749db046 authored 作者: Shane Bryldt's avatar Shane Bryldt

FS-10739: [libblade] Added TTL to request which now produces an error response…

FS-10739: [libblade] Added TTL to request which now produces an error response when a response is not received locally within the timeout, this utilizes loopback session stuff which had a couple bugs that are now also fixed, further loopback testing is still required on event subscriptions, protocol publishing, protocol locating, and protocol execution
上级 b3e84ac1
...@@ -50,9 +50,11 @@ struct blade_rpc_request_s { ...@@ -50,9 +50,11 @@ struct blade_rpc_request_s {
cJSON *message; cJSON *message;
const char *message_id; // pulled from message for easier keying const char *message_id; // pulled from message for easier keying
ks_time_t ttl;
blade_rpc_response_callback_t callback; blade_rpc_response_callback_t callback;
void *data; void *data;
// @todo ttl to wait for response before injecting an error response locally
}; };
struct blade_rpc_response_s { struct blade_rpc_response_s {
...@@ -253,6 +255,22 @@ KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *br ...@@ -253,6 +255,22 @@ KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *br
return brpcreq->message_id; return brpcreq->message_id;
} }
KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl)
{
ks_assert(brpcreq);
brpcreq->ttl = ks_time_now() + (ttl * KS_USEC_PER_SEC);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq)
{
ks_assert(brpcreq);
return brpcreq->ttl <= ks_time_now();
}
KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq) KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq)
{ {
ks_assert(brpcreq); ks_assert(brpcreq);
......
...@@ -39,7 +39,9 @@ struct blade_rpcmgr_s { ...@@ -39,7 +39,9 @@ struct blade_rpcmgr_s {
ks_hash_t *corerpcs; // method, blade_rpc_t* ks_hash_t *corerpcs; // method, blade_rpc_t*
ks_hash_t *protocolrpcs; // method, blade_rpc_t* ks_hash_t *protocolrpcs; // method, blade_rpc_t*
ks_hash_t *requests; // id, KS_TRUE ks_hash_t *requests; // id, blade_rpc_request_t*
ks_time_t requests_ttlcheck;
ks_mutex_t* requests_ttlcheck_lock;
}; };
...@@ -100,6 +102,9 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_create(blade_rpcmgr_t **brpcmgrP, blade_han ...@@ -100,6 +102,9 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_create(blade_rpcmgr_t **brpcmgrP, blade_han
ks_hash_create(&brpcmgr->requests, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool); ks_hash_create(&brpcmgr->requests, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
ks_assert(brpcmgr->requests); ks_assert(brpcmgr->requests);
ks_mutex_create(&brpcmgr->requests_ttlcheck_lock, KS_MUTEX_FLAG_NON_RECURSIVE, pool);
ks_assert(brpcmgr->requests_ttlcheck_lock);
ks_pool_set_cleanup(brpcmgr, NULL, blade_rpcmgr_cleanup); ks_pool_set_cleanup(brpcmgr, NULL, blade_rpcmgr_cleanup);
*brpcmgrP = brpcmgr; *brpcmgrP = brpcmgr;
...@@ -218,7 +223,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_add(blade_rpcmgr_t *brpcmgr, bl ...@@ -218,7 +223,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_add(blade_rpcmgr_t *brpcmgr, bl
ks_log(KS_LOG_DEBUG, "ProtocolRPC Added: %s\n", key); ks_log(KS_LOG_DEBUG, "ProtocolRPC Added: %s\n", key);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -294,6 +299,59 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, bla ...@@ -294,6 +299,59 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, bla
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr)
{
blade_session_t *loopback = NULL;
ks_assert(brpcmgr);
// All this stuff is to ensure that the requests hash is not locked up when it does not need to be,
// and this will also ensure that sessions will not lock up if any other session is trying to deal
// with timeouts already
if (ks_mutex_trylock(brpcmgr->requests_ttlcheck_lock) != KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
if (brpcmgr->requests_ttlcheck > ks_time_now()) {
ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock);
return KS_STATUS_SUCCESS;
}
ks_hash_write_lock(brpcmgr->requests);
// Give a one second delay between timeout checking
brpcmgr->requests_ttlcheck = ks_time_now() + KS_USEC_PER_SEC;
ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock);
// Now find all the expired requests and send out loopback error responses, which will invoke request removal when received and processed
for (ks_hash_iterator_t *it = ks_hash_first(brpcmgr->requests, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
blade_rpc_request_t *value = NULL;
cJSON *res = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
if (!blade_rpc_request_expired(value)) continue;
if (!loopback) loopback = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brpcmgr->handle));
ks_log(KS_LOG_DEBUG, "Request (%s) TTL timeout\n", key);
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(value), -32000, "Request timed out");
// @todo may want to include requester-nodeid and responder-nodeid into the error block when they are present in the original request
// even though a response or error without them is treated as locally processed, it may be useful to know who the request was attempted with
// when multiple options are available such as a blade.execute where the protocol has multiple possible controllers to pick from
blade_session_send(loopback, res, 0, NULL, NULL);
cJSON_Delete(res);
}
if (loopback) blade_session_read_unlock(loopback);
ks_hash_write_unlock(brpcmgr->requests);
return KS_STATUS_SUCCESS;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
* mode:c * mode:c
......
...@@ -412,7 +412,8 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j ...@@ -412,7 +412,8 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j
ks_assert(bs); ks_assert(bs);
ks_assert(json); ks_assert(json);
if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json); if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK)
ret = blade_session_receiving_push(bs, json);
else { else {
json_copy = cJSON_Duplicate(json, 1); json_copy = cJSON_Duplicate(json, 1);
if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond); if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
...@@ -477,8 +478,6 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) ...@@ -477,8 +478,6 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
// we can start stuffing any messages queued for output on the session straight to the connection right away, may need to only // we can start stuffing any messages queued for output on the session straight to the connection right away, may need to only
// do this when in session ready state but there may be implications of other states sending messages through the session // do this when in session ready state but there may be implications of other states sending messages through the session
while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) { while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
// @todo short-circuit with blade_session_receiving_push on the same session if the message has responder-nodeid == requester-nodeid == local_nodeid
// which would allow a system to send messages to itself, such as calling a protocolrpc immediately without bouncing upstream first
blade_connection_sending_push(bc, json); blade_connection_sending_push(bc, json);
cJSON_Delete(json); cJSON_Delete(json);
} }
...@@ -561,11 +560,13 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs) ...@@ -561,11 +560,13 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
cJSON_Delete(json); cJSON_Delete(json);
} }
blade_rpcmgr_request_timeouts(blade_handle_rpcmgr_get(blade_session_handle_get(bs)));
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data) KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data)
{ {
blade_rpc_request_t *brpcreq = NULL; blade_rpc_request_t *brpcreq = NULL;
const char *method = NULL; const char *method = NULL;
...@@ -580,22 +581,19 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla ...@@ -580,22 +581,19 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla
ks_assert(id); ks_assert(id);
if (method) { if (method) {
// @note This is scenario 1
// 1) Sending a request (client: method caller or consumer)
ks_log(KS_LOG_DEBUG, "Session (%s) sending request (%s) for %s\n", bs->id, id, method); ks_log(KS_LOG_DEBUG, "Session (%s) sending request (%s) for %s\n", bs->id, id, method);
blade_rpc_request_create(&brpcreq, bs->handle, ks_pool_get(bs->handle), bs->id, json, callback, data); blade_rpc_request_create(&brpcreq, bs->handle, ks_pool_get(bs->handle), bs->id, json, callback, data);
ks_assert(brpcreq); ks_assert(brpcreq);
// @todo set request TTL and figure out when requests are checked for expiration (separate thread in the handle?) if (ttl <= 0) ttl = 10;
blade_rpc_request_ttl_set(brpcreq, ttl);
blade_rpcmgr_request_add(blade_handle_rpcmgr_get(bs->handle), brpcreq); blade_rpcmgr_request_add(blade_handle_rpcmgr_get(bs->handle), brpcreq);
} else { } else {
// @note This is scenario 3
// 3) Sending a response or error (server: method callee or provider)
ks_log(KS_LOG_DEBUG, "Session (%s) sending response (%s)\n", bs->id, id); ks_log(KS_LOG_DEBUG, "Session (%s) sending response (%s)\n", bs->id, id);
} }
//blade_session_sending_push(bs, json);
if (!bs->connection) { if (!bs->connection) {
blade_session_sending_push(bs, json); blade_session_sending_push(bs, json);
} else { } else {
...@@ -653,13 +651,15 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -653,13 +651,15 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
blade_rpc_t *brpc = NULL; blade_rpc_t *brpc = NULL;
blade_rpc_request_callback_t callback = NULL; blade_rpc_request_callback_t callback = NULL;
cJSON *params = NULL; cJSON *params = NULL;
const char *params_requester_nodeid = NULL;
const char *params_responder_nodeid = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method); ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method);
params = cJSON_GetObjectItem(json, "params"); params = cJSON_GetObjectItem(json, "params");
if (params) { if (params) {
const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid"); params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid");
const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid"); params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid");
if (params_requester_nodeid && params_responder_nodeid && !blade_routemgr_local_check(blade_handle_routemgr_get(bh), params_responder_nodeid)) { if (params_requester_nodeid && params_responder_nodeid && !blade_routemgr_local_check(blade_handle_routemgr_get(bh), params_responder_nodeid)) {
// not meant for local processing, continue with standard unicast routing for requests // not meant for local processing, continue with standard unicast routing for requests
blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid); blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid);
...@@ -677,7 +677,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -677,7 +677,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid); cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL, NULL); blade_session_send(bs, res, 0, NULL, NULL);
return KS_STATUS_DISCONNECTED; return KS_STATUS_DISCONNECTED;
} }
} }
...@@ -687,7 +687,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -687,7 +687,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
} }
ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router)); ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router));
blade_session_send(bs_router, json, NULL, NULL); blade_session_send(bs_router, json, 0, NULL, NULL);
blade_session_read_unlock(bs_router); blade_session_read_unlock(bs_router);
// @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which // @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which
...@@ -704,8 +704,18 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -704,8 +704,18 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
brpc = blade_rpcmgr_corerpc_lookup(blade_handle_rpcmgr_get(bs->handle), method); brpc = blade_rpcmgr_corerpc_lookup(blade_handle_rpcmgr_get(bs->handle), method);
if (!brpc) { if (!brpc) {
cJSON *res = NULL;
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Received unknown rpc method %s\n", method); ks_log(KS_LOG_DEBUG, "Received unknown rpc method %s\n", method);
// @todo send error response, code = -32601 (method not found) blade_rpc_error_raw_create(&res, &res_error, id, -32601, "RPC method not found");
// needed in case this error must propagate further than the session which sent it
if (params_requester_nodeid) cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
if (params_responder_nodeid) cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid);
blade_session_send(bs, res, 0, NULL, NULL);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
callback = blade_rpc_callback_get(brpc); callback = blade_rpc_callback_get(brpc);
...@@ -750,7 +760,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -750,7 +760,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
} }
ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router)); ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router));
blade_session_send(bs_router, json, NULL, NULL); blade_session_send(bs_router, json, 0, NULL, NULL);
blade_session_read_unlock(bs_router); blade_session_read_unlock(bs_router);
// @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a // @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a
......
...@@ -149,6 +149,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, conf ...@@ -149,6 +149,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, conf
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback)); ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback));
blade_sessionmgr_session_add(bsmgr, bsmgr->loopback);
blade_session_state_set(bsmgr->loopback, BLADE_SESSION_STATE_STARTUP);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
...@@ -158,10 +162,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr) ...@@ -158,10 +162,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
ks_assert(bsmgr); ks_assert(bsmgr);
if (bsmgr->loopback) { //if (bsmgr->loopback) {
blade_session_hangup(bsmgr->loopback); // blade_session_hangup(bsmgr->loopback);
ks_sleep_ms(100); // ks_sleep_ms(100);
} //}
ks_hash_read_lock(bsmgr->sessions); ks_hash_read_lock(bsmgr->sessions);
for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) { for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
......
...@@ -483,7 +483,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t ...@@ -483,7 +483,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
ks_log(KS_LOG_DEBUG, "Broadcasting: protocol %s, channel %s through %s\n", protocol, channel, blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Broadcasting: protocol %s, channel %s through %s\n", protocol, channel, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data); blade_session_send(bs, req, 0, callback, data);
cJSON_Delete(req); cJSON_Delete(req);
......
...@@ -57,6 +57,8 @@ KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *b ...@@ -57,6 +57,8 @@ KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *b
KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl);
KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq); KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq); KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
......
...@@ -48,6 +48,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_remove(blade_rpcmgr_t *brpcmgr, ...@@ -48,6 +48,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_remove(blade_rpcmgr_t *brpcmgr,
KS_DECLARE(blade_rpc_request_t *) blade_rpcmgr_request_lookup(blade_rpcmgr_t *brpcmgr, const char *id); KS_DECLARE(blade_rpc_request_t *) blade_rpcmgr_request_lookup(blade_rpcmgr_t *brpcmgr, const char *id);
KS_DECLARE(ks_status_t) blade_rpcmgr_request_add(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq); KS_DECLARE(ks_status_t) blade_rpcmgr_request_add(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq); KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr);
KS_END_EXTERN_C KS_END_EXTERN_C
#endif #endif
......
...@@ -61,7 +61,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); ...@@ -61,7 +61,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs); KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs); KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id); KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
......
...@@ -68,7 +68,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char ...@@ -68,7 +68,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
......
...@@ -368,7 +368,7 @@ void command_join(blade_handle_t *bh, char *args) ...@@ -368,7 +368,7 @@ void command_join(blade_handle_t *bh, char *args)
} }
params = cJSON_CreateObject(); params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, test_join_response_handler, NULL); blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, 0, test_join_response_handler, NULL);
cJSON_Delete(params); cJSON_Delete(params);
} }
...@@ -417,7 +417,7 @@ void command_leave(blade_handle_t *bh, char *args) ...@@ -417,7 +417,7 @@ void command_leave(blade_handle_t *bh, char *args)
} }
params = cJSON_CreateObject(); params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, test_leave_response_handler, NULL); blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, 0, test_leave_response_handler, NULL);
cJSON_Delete(params); cJSON_Delete(params);
} }
...@@ -440,7 +440,7 @@ void command_talk(blade_handle_t *bh, char *args) ...@@ -440,7 +440,7 @@ void command_talk(blade_handle_t *bh, char *args)
params = cJSON_CreateObject(); params = cJSON_CreateObject();
cJSON_AddStringToObject(params, "text", args); cJSON_AddStringToObject(params, "text", args);
//blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", params, test_talk_response_handler, NULL); //blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", params, test_talk_response_handler, NULL);
blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, test_talk_response_handler, NULL); blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, 0, test_talk_response_handler, NULL);
cJSON_Delete(params); cJSON_Delete(params);
} }
......
...@@ -180,7 +180,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data) ...@@ -180,7 +180,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
// send rpcexecute response to the requester // send rpcexecute response to the requester
result = cJSON_CreateObject(); result = cJSON_CreateObject();
blade_rpcexecute_response_send(brpcreq, result); //blade_rpcexecute_response_send(brpcreq, result);
cJSON_Delete(result); cJSON_Delete(result);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论