SPDK实现一种基于json-rpc的服务,通过client-server方式进行服务的配置、运行状态监控等。
spdk rpc机制主要分为三部分:
1、rpc server端服务启动
当spdk服务如nvmf_tgt或app启动时会调用spdk_app_start,spdk_app_start会创建一个名为“app_thread”的线程,在该线程中通过调用spdk_rpc_initialize实现rpc server端服务的启动,主要包括启动socket监听并注册连接事件处理handler,在app thread线程注册timed poller:rpc_subsystem_poll。
int spdk_rpc_initialize(const char *listen_addr)
{
int rc;
if (listen_addr == NULL) {
/* Not treated as an error */
return 0;
}
if (!spdk_rpc_verify_methods()) {
return -EINVAL;
}
/* Listen on the requested address */
rc = spdk_rpc_listen(listen_addr);
if (rc != 0) {
SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr);
/* TODO: Eventually, treat this as an error. But it historically has not
* been and many tests rely on this gracefully failing. */
return 0;
}
spdk_rpc_set_state(SPDK_RPC_STARTUP);
/* Register a poller to periodically check for RPCs */
g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL);
return 0;
}
在spdk_rpc_initialize中spdk_rpc_listen主要用于启动socket服务的监听,注册jsonrpc_handler去处理rpc请求。
int spdk_rpc_listen(const char *listen_addr)
{
int rc;
memset(&g_rpc_listen_addr_unix, 0, sizeof(g_rpc_listen_addr_unix));
g_rpc_listen_addr_unix.sun_family = AF_UNIX;
rc = snprintf(g_rpc_listen_addr_unix.sun_path,
sizeof(g_rpc_listen_addr_unix.sun_path),
"%s", listen_addr);
if (rc < 0 || (size_t)rc >= sizeof(g_rpc_listen_addr_unix.sun_path)) {
SPDK_ERRLOG("RPC Listen address Unix socket path too long\n");
g_rpc_listen_addr_unix.sun_path[0] = '\0';
return -1;
}
rc = snprintf(g_rpc_lock_path, sizeof(g_rpc_lock_path), "%s.lock",
g_rpc_listen_addr_unix.sun_path);
if (rc < 0 || (size_t)rc >= sizeof(g_rpc_lock_path)) {
SPDK_ERRLOG("RPC lock path too long\n");
g_rpc_listen_addr_unix.sun_path[0] = '\0';
g_rpc_lock_path[0] = '\0';
return -1;
}
g_rpc_lock_fd = open(g_rpc_lock_path, O_RDWR | O_CREAT, 0600);
if (g_rpc_lock_fd == -1) {
SPDK_ERRLOG("Cannot open lock file %s: %s\n",
g_rpc_lock_path, spdk_strerror(errno));
g_rpc_listen_addr_unix.sun_path[0] = '\0';
g_rpc_lock_path[0] = '\0';
return -1;
}
rc = flock(g_rpc_lock_fd, LOCK_EX | LOCK_NB);
if (rc != 0) {
SPDK_ERRLOG("RPC Unix domain socket path %s in use. Specify another.\n",
g_rpc_listen_addr_unix.sun_path);
g_rpc_listen_addr_unix.sun_path[0] = '\0';
g_rpc_lock_path[0] = '\0';
return -1;
}
/*
* Since we acquired the lock, it is safe to delete the Unix socket file
* if it still exists from a previous process.
*/
unlink(g_rpc_listen_addr_unix.sun_path);
g_jsonrpc_server = spdk_jsonrpc_server_listen(AF_UNIX, 0,
(struct sockaddr *)&g_rpc_listen_addr_unix,
sizeof(g_rpc_listen_addr_unix),
jsonrpc_handler);
if (g_jsonrpc_server == NULL) {
SPDK_ERRLOG("spdk_jsonrpc_server_listen() failed\n");
close(g_rpc_lock_fd);
g_rpc_lock_fd = -1;
unlink(g_rpc_lock_path);
g_rpc_lock_path[0] = '\0';
return -1;
}
return 0;
}
jsonrpc_handler根据method name查询请求对应注册method,调用对应func处理rpc request:
static void jsonrpc_handler(struct spdk_jsonrpc_request *request,
const struct spdk_json_val *method,
const struct spdk_json_val *params)
{
struct spdk_rpc_method *m;
assert(method != NULL);
m = _get_rpc_method(method);
if (m == NULL) {
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND, "Method not found");
return;
}
if (m->is_alias_of != NULL) {
if (m->is_deprecated && !m->deprecation_warning_printed) {
SPDK_WARNLOG("RPC method %s is deprecated. Use %s instead.\n", m->name, m->is_alias_of->name);
m->deprecation_warning_printed = true;
}
m = m->is_alias_of;
}
if ((m->state_mask & g_rpc_state) == g_rpc_state) {
m->func(request, params);
} else {
if (g_rpc_state == SPDK_RPC_STARTUP) {
spdk_jsonrpc_send_error_response_fmt(request,
SPDK_JSONRPC_ERROR_INVALID_STATE,
"Method may only be called after "
"framework is initialized "
"using framework_start_init RPC.");
} else {
spdk_jsonrpc_send_error_response_fmt(request,
SPDK_JSONRPC_ERROR_INVALID_STATE,
"Method may only be called before "
"framework is initialized. "
"Use --wait-for-rpc command line "
"parameter and then issue this RPC "
"before the framework_start_init RPC.");
}
}
}
spdk_rpc_initialize同时创建一个timed poller: rpc_subsystem_poll,该poller间隔4ms调用一次,当一个rpc request到来之后该poller就会采用json格式进行解析并在解析之后调用jsonrpc_handler执行rpc请求。
Poller:rpc_subsystem_poll
->spdk_rpc_accept
->spdk_rpc_accept
->spdk_jsonrpc_server_poll
->jsonrpc_server_conn_recv
->jsonrpc_parse_request
->parse_single_request
->jsonrpc_server_handle_request
->jsonrpc_handler
结合spdk reactor调用poller机制,简单总结如下:
即绑核cpu通过reactor调用app thread上rpc timed poller,间隔4ms内执行rpc客户端发来的rpc请求。
2、注册jsonrpc_handler执行的methods
jsonrpc_handler支持的methods均通过SPDK_RPC_REGISTER->spdk_rpc_register_method进行注册。
例如:注册bdev_get_bdevs方法
SPDK_RPC_REGISTER("bdev_get_bdevs", rpc_bdev_get_bdevs, SPDK_RPC_RUNTIME)
/* Give SPDK_RPC_REGISTER a higher execution priority than
* SPDK_RPC_REGISTER_ALIAS_DEPRECATED to ensure all of the RPCs are registered
* before we try registering any aliases. Some older versions of clang may
* otherwise execute the constructors in a different order than
* defined in the source file (see issue #892).
*/
#define SPDK_RPC_REGISTER(method, func, state_mask) \
static void __attribute__((constructor(1000))) rpc_register_##func(void) \
{ \
spdk_rpc_register_method(method, func, state_mask); \
}
void spdk_rpc_register_method(const char *method, spdk_rpc_method_handler func, uint32_t state_mask)
{
struct spdk_rpc_method *m;
m = _get_rpc_method_raw(method);
if (m != NULL) {
SPDK_ERRLOG("duplicate RPC %s registered...\n", method);
g_rpcs_correct = false;
return;
}
m = calloc(1, sizeof(struct spdk_rpc_method));
assert(m != NULL);
m->name = strdup(method);
assert(m->name != NULL);
m->func = func;
m->state_mask = state_mask;
/* TODO: use a hash table or sorted list */
SLIST_INSERT_HEAD(&g_rpc_methods, m, slist);
从上述method注册代码中可以看出,所有注册的接口所有的methods存在全局变量:g_rpc_methods,处理rpc请求时查询g_rpc_methods对应的method name找到对应的func执行相关请求。
当前spdk服务注册的methods方法可以通过spdk_rpc.py rpc_get_methods查询,例如:
3、rpc client端:spdk_rpc.py执行某个rpc请求即可。当rpc server正常服务启动之后,可以执行spdk服务支持的method方法,method的具体用法可以通过--help查看
例如:查询bdev_get_bdevs的用法