提交 06e38ba1 authored 作者: colm's avatar colm 提交者: Mike Jerris

FS-9952: Add blade extention to rpc messages

上级 418092e1
......@@ -34,7 +34,6 @@
#pragma GCC optimize ("O0")
#include <blade_rpcproto.h>
#include <blade_message.h>
/*
* internal shared structure grounded in global
......@@ -700,6 +699,172 @@ KS_DECLARE(ks_status_t)blade_rpc_inherit_template(char *namespace, char* templat
}
/*
* create a request message
*/
KS_DECLARE(ks_rpcmessageid_t) blade_rpc_create_request(char *namespace,
char *method,
blade_rpc_fields_t* fields,
cJSON **paramsP,
cJSON **requestP)
{
cJSON *jversion = NULL;
blade_rpc_callbackpair_t* callbacks = NULL;
*requestP = NULL;
ks_hash_read_lock(g_handle->namespace_hash);
blade_rpc_namespace_t *n = ks_hash_search(g_handle->namespace_hash, namespace, KS_UNLOCKED);
if (n) {
ks_hash_read_lock(n->method_hash);
callbacks = ks_hash_search(n->method_hash, method, KS_UNLOCKED);
if (callbacks) {
jversion = cJSON_CreateString(n->version);
}
ks_hash_read_unlock(n->method_hash);
}
ks_hash_read_unlock(g_handle->namespace_hash);
if (!n) {
ks_log(KS_LOG_ERROR, "No namespace %s found\n", namespace);
return 0;
}
if (!callbacks) {
ks_log(KS_LOG_ERROR, "No method %s.%s found\n", namespace, method);
return 0;
}
ks_rpcmessageid_t msgid = ks_rpcmessage_create_request(namespace, method, paramsP, requestP);
if (!msgid || *requestP == NULL) {
ks_log(KS_LOG_ERROR, "Unable to create rpc message for method %s.%s\n", namespace, method);
return 0;
}
cJSON *jfields = cJSON_CreateObject();
cJSON_AddItemToObject(jfields, "version", jversion);
if (fields->to) {
cJSON_AddStringToObject(jfields, "to", fields->to);
}
if (fields->from) {
cJSON_AddStringToObject(jfields, "from", fields->from);
}
if (fields->token) {
cJSON_AddStringToObject(jfields, "token", fields->token);
}
cJSON_AddItemToObject(*requestP, "blade", jfields);
return msgid;
}
KS_DECLARE(ks_rpcmessageid_t) blade_rpc_create_response(cJSON *request,
cJSON **replyP,
cJSON **responseP)
{
cJSON *jfields = cJSON_GetObjectItem(request, "blade");
if (!jfields) {
ks_log(KS_LOG_ERROR, "No blade routing info found. Unable to create response\n");
return 0;
}
ks_rpcmessageid_t msgid = ks_rpcmessage_create_response(request, replyP, responseP);
if (!msgid || *responseP == NULL) {
ks_log(KS_LOG_ERROR, "Unable to create rpc response message\n"); //TODO : Add namespace, method from request
return 0;
}
const char *to = cJSON_GetObjectCstr(jfields, "to");
const char *from = cJSON_GetObjectCstr(jfields, "from");
const char *token = cJSON_GetObjectCstr(jfields, "token");
const char *version = cJSON_GetObjectCstr(jfields, "version");
cJSON *blade = cJSON_CreateObject();
if (to) {
cJSON_AddStringToObject(blade, "to", from);
}
if (from) {
cJSON_AddStringToObject(blade, "from", to);
}
if (token) {
cJSON_AddStringToObject(blade, "token", token);
}
if (version) {
cJSON_AddStringToObject(blade, "version", version);
}
cJSON_AddItemToObject(*responseP, "blade", blade);
return msgid;
}
const char BLADE_JRPC_METHOD[] = "method";
const char BLADE_JRPC_FIELDS[] = "blade";
const char BLADE_JRPC_TO[] = "to";
const char BLADE_JRPC_FROM[] = "from";
const char BLADE_JRPC_TOKEN[] = "token";
const char BLADE_JRPC_VERSION[] = "version";
KS_DECLARE(ks_status_t) blade_rpc_parse_message(cJSON *message,
char **namespaceP,
char **methodP,
char **versionP,
blade_rpc_fields_t **fieldsP)
{
const char *m = cJSON_GetObjectCstr(message, BLADE_JRPC_METHOD);
cJSON *blade = cJSON_GetObjectItem(message, BLADE_JRPC_FIELDS);
*fieldsP = NULL;
*namespaceP = NULL;
*versionP = NULL;
*methodP = NULL;
if (!m || !blade) {
const char *buffer = cJSON_PrintUnformatted(message);
ks_log(KS_LOG_ERROR, "Unable to locate necessary fields in message:\n%s\n", buffer);
ks_pool_free(g_handle->pool, buffer);
return KS_STATUS_FAIL;
}
ks_size_t len = KS_RPCMESSAGE_COMMAND_LENGTH + 1 +
KS_RPCMESSAGE_NAMESPACE_LENGTH + 1 +
KS_RPCMESSAGE_VERSION_LENGTH + 1 +
sizeof(blade_rpc_fields_t) + 1;
blade_rpc_fields_t *fields = (blade_rpc_fields_t *)ks_pool_alloc(g_handle->pool, len);
fields->to = cJSON_GetObjectCstr(blade, BLADE_JRPC_TO);
fields->from = cJSON_GetObjectCstr(blade, BLADE_JRPC_FROM);
fields->from = cJSON_GetObjectCstr(blade, BLADE_JRPC_TOKEN);
char *namespace = (char*)fields + sizeof(blade_rpc_fields_t);
char *command = namespace + KS_RPCMESSAGE_NAMESPACE_LENGTH + 1;
char *version = command + KS_RPCMESSAGE_COMMAND_LENGTH + 1;
blade_rpc_parse_fqcommand(m, namespace, command);
strcpy(version, cJSON_GetObjectCstr(blade, BLADE_JRPC_VERSION));
*fieldsP = fields;
*namespaceP = namespace;
*methodP = command;
return KS_STATUS_SUCCESS;
}
/*
* send message
......@@ -769,7 +934,7 @@ static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request)
if (!fqcommand) {
error = cJSON_CreateObject();
cJSON_AddStringToObject(error, "errormessage", "Command not specified");
ks_rpcmessage_create_request("rpcprotocol", "unknowncommand", NULL, NULL, &error, &responseP);
ks_rpcmessage_create_request("rpcprotocol", "unknowncommand", &error, &responseP);
blade_rpc_write_json(responseP);
return KS_STATUS_FAIL;
}
......@@ -907,23 +1072,23 @@ KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data,
return KS_STATUS_FAIL;
}
KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message)
{
uint8_t* data = NULL;
ks_size_t size = 0;
blade_message_get(message, (void **)&data, &size);
if (data && size>0) {
ks_status_t s = blade_rpc_process_data(data, size);
blade_message_discard(&message);
return s;
}
ks_log(KS_LOG_ERROR, "Message read failed\n");
return KS_STATUS_FAIL;
}
//KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message)
//{
// uint8_t* data = NULL;
// ks_size_t size = 0;
//
// blade_message_get(message, (void **)&data, &size);
//
// if (data && size>0) {
// ks_status_t s = blade_rpc_process_data(data, size);
// blade_message_discard(&message);
// return s;
// }
//
// ks_log(KS_LOG_ERROR, "Message read failed\n");
// return KS_STATUS_FAIL;
//
//}
/* For Emacs:
......
......@@ -39,7 +39,6 @@
// temp typedefs to get compile going
//typedef struct blade_peer_s blade_peer_t;
//typedef struct blade_message_s blade_message_t;
//typedef struct blade_event_s blade_event_t;
#define KS_RPCMESSAGE_NAMESPACE_LENGTH 16
......@@ -48,6 +47,20 @@
#define KS_RPCMESSAGE_VERSION_LENGTH 9
/*
* contents to add to the "blade" field in rpc
*/
typedef struct blade_rpc_fields_s {
const char *to;
const char *from;
const char *token;
} blade_rpc_fields_t;
enum jrpc_status_t {
JRPC_PASS = (1 << 0),
JRPC_SEND = (1 << 1),
......@@ -101,6 +114,24 @@ KS_DECLARE(ks_status_t)blade_rpc_register_template_function(char *name,
KS_DECLARE(ks_status_t)blade_rpc_inherit_template(char *namespace, char* template);
/*
* create a request message
*/
KS_DECLARE(ks_rpcmessageid_t) blade_rpc_create_request(char *namespace,
char *method,
blade_rpc_fields_t* fields,
cJSON **paramsP,
cJSON **requestP);
KS_DECLARE(ks_rpcmessageid_t) blade_rpc_create_response(cJSON *request,
cJSON **reply,
cJSON **response);
KS_DECLARE(ks_status_t) blade_rpc_parse_message(cJSON *message,
char **namespace,
char **method,
char **version,
blade_rpc_fields_t **fieldsP);
/*
* peer create/destroy
......@@ -121,7 +152,6 @@ KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json);
* process inbound message
* -----------------------
*/
KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message);
KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data, ks_size_t size);
KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request);
......
......@@ -45,7 +45,7 @@ KS_BEGIN_EXTERN_C
#define KS_RPCMESSAGE_VERSION_LENGTH 9
typedef uint32_t ks_rpcmessage_id;
typedef uint32_t ks_rpcmessageid_t;
KS_DECLARE(void) ks_rpcmessage_init(ks_pool_t *pool);
......@@ -54,26 +54,22 @@ KS_DECLARE(void*) ks_json_pool_alloc(ks_size_t size);
KS_DECLARE(void) ks_json_pool_free(void *ptr);
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_request(char *namespace,
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_request(char *namespace,
char *method,
char *sessionid,
char *version,
cJSON **parmsP,
cJSON **paramsP,
cJSON **requestP);
KS_DECLARE(ks_size_t) ks_rpc_create_buffer(char *namespace,
char *method,
char *sessionid,
char *version,
cJSON **parmsP,
cJSON **paramsP,
ks_buffer_t *buffer);
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_response(
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_response(
const cJSON *request,
cJSON **resultP,
cJSON **responseP);
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_errorresponse(
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_errorresponse(
const cJSON *request,
cJSON **errorP,
cJSON **responseP);
......
......@@ -48,6 +48,16 @@ struct
} handle = {NULL, 0, NULL};
const char PROTOCOL[] = "jsonrpc";
const char PROTOCOL_VERSION[] = "2.0";
const char ID[] = "id";
const char METHOD[] = "method";
const char PARAMS[] = "params";
const char ERROR[] = "error";
const char RESULT[] = "result";
KS_DECLARE(void*) ks_json_pool_alloc(ks_size_t size)
{
return ks_pool_alloc(handle.pool, size);
......@@ -96,10 +106,10 @@ static uint32_t ks_rpcmessage_next_id()
static cJSON *ks_rpcmessage_new(uint32_t id)
{
cJSON *obj = cJSON_CreateObject();
cJSON_AddItemToObject(obj, "jsonrpc", cJSON_CreateString("2.0"));
cJSON_AddItemToObject(obj, PROTOCOL, cJSON_CreateString(PROTOCOL_VERSION));
if (id) {
cJSON_AddItemToObject(obj, "id", cJSON_CreateNumber(id));
cJSON_AddItemToObject(obj, ID, cJSON_CreateNumber(id));
}
return obj;
......@@ -108,10 +118,10 @@ static cJSON *ks_rpcmessage_new(uint32_t id)
static cJSON *ks_rpcmessage_dup(cJSON *msgid)
{
cJSON *obj = cJSON_CreateObject();
cJSON_AddItemToObject(obj, "jsonrpc", cJSON_CreateString("2.0"));
cJSON_AddItemToObject(obj, PROTOCOL, cJSON_CreateString(PROTOCOL_VERSION));
if (msgid) {
cJSON_AddItemToObject(obj, "id", cJSON_Duplicate(msgid, 0));
cJSON_AddItemToObject(obj, ID, cJSON_Duplicate(msgid, 0));
}
return obj;
......@@ -119,8 +129,8 @@ static cJSON *ks_rpcmessage_dup(cJSON *msgid)
KS_DECLARE(ks_bool_t) ks_rpcmessage_isrequest(cJSON *msg)
{
cJSON *result = cJSON_GetObjectItem(msg, "result");
cJSON *error = cJSON_GetObjectItem(msg, "error");
cJSON *result = cJSON_GetObjectItem(msg, RESULT);
cJSON *error = cJSON_GetObjectItem(msg, ERROR);
if (result || error) {
return KS_FALSE;
......@@ -131,7 +141,7 @@ KS_DECLARE(ks_bool_t) ks_rpcmessage_isrequest(cJSON *msg)
KS_DECLARE(ks_bool_t) ks_rpcmessage_isrpc(cJSON *msg)
{
cJSON *rpc = cJSON_GetObjectItem(msg, "json-rpc");
cJSON *rpc = cJSON_GetObjectItem(msg, PROTOCOL);
if (rpc) {
return KS_FALSE;
......@@ -143,51 +153,28 @@ KS_DECLARE(ks_bool_t) ks_rpcmessage_isrpc(cJSON *msg)
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_request(char *namespace,
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_request(char *namespace,
char *command,
char *sessionid,
char *version,
cJSON **paramsP,
cJSON **requestP)
{
cJSON *msg, *params = NULL;
*requestP = NULL;
ks_rpcmessage_id msgid = ks_rpcmessage_next_id();
ks_rpcmessageid_t msgid = ks_rpcmessage_next_id();
msg = ks_rpcmessage_new(msgid);
if (paramsP && *paramsP) { /* parameters have been passed */
cJSON *p = *paramsP;
if (paramsP) {
if (p->type != cJSON_Object) { /* need to wrap this in a param field */
params = cJSON_CreateObject();
cJSON_AddItemToObject(params, "param", p);
}
else {
if (*paramsP) { /* parameters have been passed */
params = *paramsP;
}
cJSON *v = cJSON_GetObjectItem(params, "version");
if (!v) { /* add version if needed */
cJSON_AddStringToObject(params, "version", version);
}
else {
cJSON_AddStringToObject(params, "version", "0");
}
}
if (!params) {
params = cJSON_CreateObject();
if (version && version[0] != 0) {
cJSON_AddStringToObject(params, "version", version);
}
else {
cJSON_AddStringToObject(params, "version", "0");
*paramsP = params;
}
cJSON_AddItemToObject(msg, PARAMS, params);
}
char fqcommand[KS_RPCMESSAGE_FQCOMMAND_LENGTH];
......@@ -195,17 +182,7 @@ KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_request(char *namespace,
sprintf(fqcommand, "%s.%s", namespace, command);
cJSON_AddItemToObject(msg, "method", cJSON_CreateString(fqcommand));
if (sessionid && sessionid[0] != 0) {
cJSON_AddStringToObject(params, "sessionid", sessionid);
}
cJSON_AddItemToObject(msg, "params", params);
if (paramsP) {
*paramsP = params;
}
cJSON_AddItemToObject(msg, METHOD, cJSON_CreateString(fqcommand));
*requestP = msg;
return msgid;
......@@ -213,19 +190,22 @@ KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_request(char *namespace,
KS_DECLARE(ks_size_t) ks_rpc_create_buffer(char *namespace,
char *method,
char *sessionid,
char *version,
cJSON **parms,
cJSON **params,
ks_buffer_t *buffer)
{
cJSON *message;
ks_rpcmessage_id msgid = ks_rpcmessage_create_request(namespace, method, sessionid, version, parms, &message);
ks_rpcmessageid_t msgid = ks_rpcmessage_create_request(namespace, method, params, &message);
if (!msgid) {
return 0;
}
if ( (*params)->child == NULL) {
cJSON_AddNullToObject(*params, "bladenull");
}
const char* b = cJSON_PrintUnformatted(message);
ks_size_t size = strlen(b);
......@@ -236,14 +216,14 @@ KS_DECLARE(ks_size_t) ks_rpc_create_buffer(char *namespace,
}
static ks_rpcmessage_id ks_rpcmessage_get_messageid(const cJSON *msg, cJSON **cmsgidP)
static ks_rpcmessageid_t ks_rpcmessage_get_messageid(const cJSON *msg, cJSON **cmsgidP)
{
uint32_t msgid = 0;
ks_rpcmessageid_t msgid = 0;
cJSON *cmsgid = cJSON_GetObjectItem(msg, "id");
cJSON *cmsgid = cJSON_GetObjectItem(msg, ID);
if (cmsgid->type == cJSON_Number) {
msgid = (uint32_t) cmsgid->valueint;
msgid = (ks_rpcmessageid_t) cmsgid->valueint;
}
*cmsgidP = cmsgid;
......@@ -252,24 +232,17 @@ static ks_rpcmessage_id ks_rpcmessage_get_messageid(const cJSON *msg, cJSON **cm
}
static ks_rpcmessage_id ks_rpcmessage_new_response(
static ks_rpcmessageid_t ks_rpcmessage_new_response(
const cJSON *request,
cJSON *result,
cJSON **result,
cJSON **pmsg)
{
cJSON *respmsg = NULL;
cJSON *cmsgid = NULL;
cJSON *version = NULL;
cJSON *sessionid = NULL;
cJSON *command = cJSON_GetObjectItem(request, "method");
cJSON *params = cJSON_GetObjectItem(request, "params");
if (params) {
version = cJSON_GetObjectItem(request, "version");
}
cJSON *command = cJSON_GetObjectItem(request, METHOD);
ks_rpcmessage_id msgid = ks_rpcmessage_get_messageid(request, &cmsgid );
ks_rpcmessageid_t msgid = ks_rpcmessage_get_messageid(request, &cmsgid );
if (!msgid || !command) {
return 0;
......@@ -277,91 +250,51 @@ static ks_rpcmessage_id ks_rpcmessage_new_response(
*pmsg = respmsg = ks_rpcmessage_dup(cmsgid);
cJSON_AddItemToObject(respmsg, "method", cJSON_Duplicate(command, 0));
if (result) {
cJSON *params = cJSON_GetObjectItem(request, "params");
if (params) {
version = cJSON_GetObjectItem(params, "version");
if (version) {
cJSON_AddItemToObject(result, "version", cJSON_Duplicate(version, 0));
}
sessionid = cJSON_GetObjectItem(params, "sessionid");
if (sessionid) {
cJSON_AddItemToObject(result, "sessionid", cJSON_Duplicate(sessionid, 0));
}
}
cJSON_AddItemToObject(respmsg, METHOD, cJSON_Duplicate(command, 0));
cJSON_AddItemToObject(respmsg, "result", result);
if (result && *result) {
cJSON_AddItemToObject(respmsg, RESULT, *result);
}
return msgid;
}
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_response(
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_response(
const cJSON *request,
cJSON **resultP,
cJSON **responseP)
{
ks_rpcmessage_id msgid = ks_rpcmessage_new_response(request, *resultP, responseP);
ks_rpcmessageid_t msgid = ks_rpcmessage_new_response(request, resultP, responseP);
cJSON *respmsg = *responseP;
if (msgid) {
if (*resultP == NULL) {
*resultP = cJSON_CreateObject();
cJSON *result = *resultP;
cJSON *params = cJSON_GetObjectItem(request, "params");
if (params) {
cJSON *version = cJSON_GetObjectItem(request, "version");
cJSON *sessionid = cJSON_GetObjectItem(request, "sessionid");
if (version) {
cJSON_AddItemToObject(result, "version", cJSON_Duplicate(version, 0));
}
else {
cJSON_AddStringToObject(result, "version", "0");
}
if (sessionid) {
cJSON_AddItemToObject(result, "sessionid", cJSON_Duplicate(sessionid, 0));
}
}
else {
cJSON_AddStringToObject(result, "version", "0");
}
cJSON_AddItemToObject(respmsg, "result", result);
if (resultP && *resultP == NULL) {
cJSON *result = cJSON_CreateObject();
*resultP = result;
cJSON_AddItemToObject(respmsg, RESULT, result);
}
}
return msgid;
}
KS_DECLARE(ks_rpcmessage_id) ks_rpcmessage_create_errorresponse(
KS_DECLARE(ks_rpcmessageid_t) ks_rpcmessage_create_errorresponse(
const cJSON *request,
cJSON **errorP,
cJSON **responseP)
{
ks_rpcmessage_id msgid = ks_rpcmessage_new_response(request, *errorP, responseP);
ks_rpcmessageid_t msgid = ks_rpcmessage_new_response(request, errorP, responseP);
cJSON *respmsg = *responseP;
if (msgid) {
if (*errorP == NULL) {
*errorP = cJSON_CreateObject();
cJSON_AddItemToObject(respmsg, "error", *errorP);
if (errorP && *errorP == NULL) {
cJSON *error = cJSON_CreateObject();
*errorP = error;
cJSON_AddItemToObject(respmsg, ERROR, error);
}
}
......
......@@ -18,8 +18,8 @@ void test01()
cJSON* parms1 = NULL;
cJSON* response1 = NULL;
/*namespace, method, sessionid, version, params, **request */
ks_rpcmessage_id msgid = ks_rpcmessage_create_request("app1", "func1", "s001", "1.0", &parms1, &request1);
/*namespace, method, params, **request */
ks_rpcmessageid_t msgid = ks_rpcmessage_create_request("app1", "func1", &parms1, &request1);
if (msgid == 0) {
printf("message create failed %d\n", msgid);
}
......@@ -37,7 +37,7 @@ void test01()
ks_buffer_create(&buffer, 256, 256, 1024);
ks_size_t n = ks_rpc_create_buffer("app2", "func2", "s002", "1.1", &parms2, buffer);
ks_size_t n = ks_rpc_create_buffer("app2", "func2", &parms2, buffer);
ks_size_t size = ks_buffer_len(buffer);
char *b = (char *)ks_pool_alloc(pool, size+1);
......@@ -51,7 +51,7 @@ void test01()
cJSON *parms3 = cJSON_CreateNumber(1);
cJSON *request3 = NULL;
msgid = ks_rpcmessage_create_request("app1", "badbunny", "s002", "1.1", &parms3, &request3);
msgid = ks_rpcmessage_create_request("app1", "badbunny", &parms3, &request3);
data = cJSON_PrintUnformatted(request3);
printf("\ntest01i request: %d\n%s\n\n", msgid, data);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论