searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

SPDK RPC简析

2023-08-22 01:29:35
264
0

SPDK实现一种基于json-rpc的服务,通过client-server方式进行服务的配置、运行状态监控等。

spdk rpc机制主要分为三部分:

1、rpc server端服务启动

当spdk服务如nvmf_tgt或app启动时会调用spdk_app_start,spdk_app_start会创建一个名为“app_thread”的线程,在该线程中通过调用spdk_rpc_initialize实现rpc server端服务的启动,主要包括启动socket监听并注册连接事件处理handler,在app thread线程注册timed poller:rpc_subsystem_poll。

int  spdk_rpc_initialize(const char *listen_addr)
{
	int rc;

	if (listen_addr == NULL) {
		/* Not treated as an error */
		return 0;
	}

	if (!spdk_rpc_verify_methods()) {
		return -EINVAL;
	}

	/* Listen on the requested address */
	rc = spdk_rpc_listen(listen_addr);
	if (rc != 0) {
		SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr);
		/* TODO: Eventually, treat this as an error. But it historically has not
		 * been and many tests rely on this gracefully failing. */
		return 0;
	}

	spdk_rpc_set_state(SPDK_RPC_STARTUP);

	/* Register a poller to periodically check for RPCs */
	g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL);

	return 0;
}

在spdk_rpc_initialize中spdk_rpc_listen主要用于启动socket服务的监听,注册jsonrpc_handler去处理rpc请求。

int spdk_rpc_listen(const char *listen_addr)
{
	int rc;

	memset(&g_rpc_listen_addr_unix, 0, sizeof(g_rpc_listen_addr_unix));

	g_rpc_listen_addr_unix.sun_family = AF_UNIX;
	rc = snprintf(g_rpc_listen_addr_unix.sun_path,
		      sizeof(g_rpc_listen_addr_unix.sun_path),
		      "%s", listen_addr);
	if (rc < 0 || (size_t)rc >= sizeof(g_rpc_listen_addr_unix.sun_path)) {
		SPDK_ERRLOG("RPC Listen address Unix socket path too long\n");
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		return -1;
	}

	rc = snprintf(g_rpc_lock_path, sizeof(g_rpc_lock_path), "%s.lock",
		      g_rpc_listen_addr_unix.sun_path);
	if (rc < 0 || (size_t)rc >= sizeof(g_rpc_lock_path)) {
		SPDK_ERRLOG("RPC lock path too long\n");
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	g_rpc_lock_fd = open(g_rpc_lock_path, O_RDWR | O_CREAT, 0600);
	if (g_rpc_lock_fd == -1) {
		SPDK_ERRLOG("Cannot open lock file %s: %s\n",
			    g_rpc_lock_path, spdk_strerror(errno));
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	rc = flock(g_rpc_lock_fd, LOCK_EX | LOCK_NB);
	if (rc != 0) {
		SPDK_ERRLOG("RPC Unix domain socket path %s in use. Specify another.\n",
			    g_rpc_listen_addr_unix.sun_path);
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	/*
	 * Since we acquired the lock, it is safe to delete the Unix socket file
	 * if it still exists from a previous process.
	 */
	unlink(g_rpc_listen_addr_unix.sun_path);

	g_jsonrpc_server = spdk_jsonrpc_server_listen(AF_UNIX, 0,
			   (struct sockaddr *)&g_rpc_listen_addr_unix,
			   sizeof(g_rpc_listen_addr_unix),
			   jsonrpc_handler);
	if (g_jsonrpc_server == NULL) {
		SPDK_ERRLOG("spdk_jsonrpc_server_listen() failed\n");
		close(g_rpc_lock_fd);
		g_rpc_lock_fd = -1;
		unlink(g_rpc_lock_path);
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	return 0;
}

jsonrpc_handler根据method name查询请求对应注册method,调用对应func处理rpc request:

static void  jsonrpc_handler(struct spdk_jsonrpc_request *request,
		const struct spdk_json_val *method,
		const struct spdk_json_val *params)
{
	struct spdk_rpc_method *m;

	assert(method != NULL);

	m = _get_rpc_method(method);
	if (m == NULL) {
		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND, "Method not found");
		return;
	}

	if (m->is_alias_of != NULL) {
		if (m->is_deprecated && !m->deprecation_warning_printed) {
			SPDK_WARNLOG("RPC method %s is deprecated.  Use %s instead.\n", m->name, m->is_alias_of->name);
			m->deprecation_warning_printed = true;
		}
		m = m->is_alias_of;
	}

	if ((m->state_mask & g_rpc_state) == g_rpc_state) {
		m->func(request, params);
	} else {
		if (g_rpc_state == SPDK_RPC_STARTUP) {
			spdk_jsonrpc_send_error_response_fmt(request,
							     SPDK_JSONRPC_ERROR_INVALID_STATE,
							     "Method may only be called after "
							     "framework is initialized "
							     "using framework_start_init RPC.");
		} else {
			spdk_jsonrpc_send_error_response_fmt(request,
							     SPDK_JSONRPC_ERROR_INVALID_STATE,
							     "Method may only be called before "
							     "framework is initialized. "
							     "Use --wait-for-rpc command line "
							     "parameter and then issue this RPC "
							     "before the framework_start_init RPC.");
		}
	}
}

spdk_rpc_initialize同时创建一个timed poller: rpc_subsystem_poll,该poller间隔4ms调用一次,当一个rpc request到来之后该poller就会采用json格式进行解析并在解析之后调用jsonrpc_handler执行rpc请求。

Poller:rpc_subsystem_poll

->spdk_rpc_accept

   ->spdk_rpc_accept

       ->spdk_jsonrpc_server_poll

           ->jsonrpc_server_conn_recv

               ->jsonrpc_parse_request

                  ->parse_single_request

                      ->jsonrpc_server_handle_request

                          ->jsonrpc_handler

结合spdk reactor调用poller机制,简单总结如下:

即绑核cpu通过reactor调用app thread上rpc timed poller,间隔4ms内执行rpc客户端发来的rpc请求。

2、注册jsonrpc_handler执行的methods

jsonrpc_handler支持的methods均通过SPDK_RPC_REGISTER->spdk_rpc_register_method进行注册。

例如:注册bdev_get_bdevs方法

SPDK_RPC_REGISTER("bdev_get_bdevs", rpc_bdev_get_bdevs, SPDK_RPC_RUNTIME)

/* Give SPDK_RPC_REGISTER a higher execution priority than
 * SPDK_RPC_REGISTER_ALIAS_DEPRECATED to ensure all of the RPCs are registered
 * before we try registering any aliases.  Some older versions of clang may
 * otherwise execute the constructors in a different order than
 * defined in the source file (see issue #892).
 */
#define SPDK_RPC_REGISTER(method, func, state_mask) \
static void __attribute__((constructor(1000))) rpc_register_##func(void) \
{ \
	spdk_rpc_register_method(method, func, state_mask); \
}

void spdk_rpc_register_method(const char *method, spdk_rpc_method_handler func, uint32_t state_mask)
{
	struct spdk_rpc_method *m;

	m = _get_rpc_method_raw(method);
	if (m != NULL) {
		SPDK_ERRLOG("duplicate RPC %s registered...\n", method);
		g_rpcs_correct = false;
		return;
	}

	m = calloc(1, sizeof(struct spdk_rpc_method));
	assert(m != NULL);

	m->name = strdup(method);
	assert(m->name != NULL);

	m->func = func;
	m->state_mask = state_mask;

	/* TODO: use a hash table or sorted list */
	SLIST_INSERT_HEAD(&g_rpc_methods, m, slist);

从上述method注册代码中可以看出,所有注册的接口所有的methods存在全局变量:g_rpc_methods,处理rpc请求时查询g_rpc_methods对应的method name找到对应的func执行相关请求。

当前spdk服务注册的methods方法可以通过spdk_rpc.py rpc_get_methods查询,例如:

3、rpc client端:spdk_rpc.py执行某个rpc请求即可。当rpc server正常服务启动之后,可以执行spdk服务支持的method方法,method的具体用法可以通过--help查看

例如:查询bdev_get_bdevs的用法

0条评论
0 / 1000
l****n
5文章数
0粉丝数
l****n
5 文章 | 0 粉丝
原创

SPDK RPC简析

2023-08-22 01:29:35
264
0

SPDK实现一种基于json-rpc的服务,通过client-server方式进行服务的配置、运行状态监控等。

spdk rpc机制主要分为三部分:

1、rpc server端服务启动

当spdk服务如nvmf_tgt或app启动时会调用spdk_app_start,spdk_app_start会创建一个名为“app_thread”的线程,在该线程中通过调用spdk_rpc_initialize实现rpc server端服务的启动,主要包括启动socket监听并注册连接事件处理handler,在app thread线程注册timed poller:rpc_subsystem_poll。

int  spdk_rpc_initialize(const char *listen_addr)
{
	int rc;

	if (listen_addr == NULL) {
		/* Not treated as an error */
		return 0;
	}

	if (!spdk_rpc_verify_methods()) {
		return -EINVAL;
	}

	/* Listen on the requested address */
	rc = spdk_rpc_listen(listen_addr);
	if (rc != 0) {
		SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr);
		/* TODO: Eventually, treat this as an error. But it historically has not
		 * been and many tests rely on this gracefully failing. */
		return 0;
	}

	spdk_rpc_set_state(SPDK_RPC_STARTUP);

	/* Register a poller to periodically check for RPCs */
	g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL);

	return 0;
}

在spdk_rpc_initialize中spdk_rpc_listen主要用于启动socket服务的监听,注册jsonrpc_handler去处理rpc请求。

int spdk_rpc_listen(const char *listen_addr)
{
	int rc;

	memset(&g_rpc_listen_addr_unix, 0, sizeof(g_rpc_listen_addr_unix));

	g_rpc_listen_addr_unix.sun_family = AF_UNIX;
	rc = snprintf(g_rpc_listen_addr_unix.sun_path,
		      sizeof(g_rpc_listen_addr_unix.sun_path),
		      "%s", listen_addr);
	if (rc < 0 || (size_t)rc >= sizeof(g_rpc_listen_addr_unix.sun_path)) {
		SPDK_ERRLOG("RPC Listen address Unix socket path too long\n");
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		return -1;
	}

	rc = snprintf(g_rpc_lock_path, sizeof(g_rpc_lock_path), "%s.lock",
		      g_rpc_listen_addr_unix.sun_path);
	if (rc < 0 || (size_t)rc >= sizeof(g_rpc_lock_path)) {
		SPDK_ERRLOG("RPC lock path too long\n");
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	g_rpc_lock_fd = open(g_rpc_lock_path, O_RDWR | O_CREAT, 0600);
	if (g_rpc_lock_fd == -1) {
		SPDK_ERRLOG("Cannot open lock file %s: %s\n",
			    g_rpc_lock_path, spdk_strerror(errno));
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	rc = flock(g_rpc_lock_fd, LOCK_EX | LOCK_NB);
	if (rc != 0) {
		SPDK_ERRLOG("RPC Unix domain socket path %s in use. Specify another.\n",
			    g_rpc_listen_addr_unix.sun_path);
		g_rpc_listen_addr_unix.sun_path[0] = '\0';
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	/*
	 * Since we acquired the lock, it is safe to delete the Unix socket file
	 * if it still exists from a previous process.
	 */
	unlink(g_rpc_listen_addr_unix.sun_path);

	g_jsonrpc_server = spdk_jsonrpc_server_listen(AF_UNIX, 0,
			   (struct sockaddr *)&g_rpc_listen_addr_unix,
			   sizeof(g_rpc_listen_addr_unix),
			   jsonrpc_handler);
	if (g_jsonrpc_server == NULL) {
		SPDK_ERRLOG("spdk_jsonrpc_server_listen() failed\n");
		close(g_rpc_lock_fd);
		g_rpc_lock_fd = -1;
		unlink(g_rpc_lock_path);
		g_rpc_lock_path[0] = '\0';
		return -1;
	}

	return 0;
}

jsonrpc_handler根据method name查询请求对应注册method,调用对应func处理rpc request:

static void  jsonrpc_handler(struct spdk_jsonrpc_request *request,
		const struct spdk_json_val *method,
		const struct spdk_json_val *params)
{
	struct spdk_rpc_method *m;

	assert(method != NULL);

	m = _get_rpc_method(method);
	if (m == NULL) {
		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND, "Method not found");
		return;
	}

	if (m->is_alias_of != NULL) {
		if (m->is_deprecated && !m->deprecation_warning_printed) {
			SPDK_WARNLOG("RPC method %s is deprecated.  Use %s instead.\n", m->name, m->is_alias_of->name);
			m->deprecation_warning_printed = true;
		}
		m = m->is_alias_of;
	}

	if ((m->state_mask & g_rpc_state) == g_rpc_state) {
		m->func(request, params);
	} else {
		if (g_rpc_state == SPDK_RPC_STARTUP) {
			spdk_jsonrpc_send_error_response_fmt(request,
							     SPDK_JSONRPC_ERROR_INVALID_STATE,
							     "Method may only be called after "
							     "framework is initialized "
							     "using framework_start_init RPC.");
		} else {
			spdk_jsonrpc_send_error_response_fmt(request,
							     SPDK_JSONRPC_ERROR_INVALID_STATE,
							     "Method may only be called before "
							     "framework is initialized. "
							     "Use --wait-for-rpc command line "
							     "parameter and then issue this RPC "
							     "before the framework_start_init RPC.");
		}
	}
}

spdk_rpc_initialize同时创建一个timed poller: rpc_subsystem_poll,该poller间隔4ms调用一次,当一个rpc request到来之后该poller就会采用json格式进行解析并在解析之后调用jsonrpc_handler执行rpc请求。

Poller:rpc_subsystem_poll

->spdk_rpc_accept

   ->spdk_rpc_accept

       ->spdk_jsonrpc_server_poll

           ->jsonrpc_server_conn_recv

               ->jsonrpc_parse_request

                  ->parse_single_request

                      ->jsonrpc_server_handle_request

                          ->jsonrpc_handler

结合spdk reactor调用poller机制,简单总结如下:

即绑核cpu通过reactor调用app thread上rpc timed poller,间隔4ms内执行rpc客户端发来的rpc请求。

2、注册jsonrpc_handler执行的methods

jsonrpc_handler支持的methods均通过SPDK_RPC_REGISTER->spdk_rpc_register_method进行注册。

例如:注册bdev_get_bdevs方法

SPDK_RPC_REGISTER("bdev_get_bdevs", rpc_bdev_get_bdevs, SPDK_RPC_RUNTIME)

/* Give SPDK_RPC_REGISTER a higher execution priority than
 * SPDK_RPC_REGISTER_ALIAS_DEPRECATED to ensure all of the RPCs are registered
 * before we try registering any aliases.  Some older versions of clang may
 * otherwise execute the constructors in a different order than
 * defined in the source file (see issue #892).
 */
#define SPDK_RPC_REGISTER(method, func, state_mask) \
static void __attribute__((constructor(1000))) rpc_register_##func(void) \
{ \
	spdk_rpc_register_method(method, func, state_mask); \
}

void spdk_rpc_register_method(const char *method, spdk_rpc_method_handler func, uint32_t state_mask)
{
	struct spdk_rpc_method *m;

	m = _get_rpc_method_raw(method);
	if (m != NULL) {
		SPDK_ERRLOG("duplicate RPC %s registered...\n", method);
		g_rpcs_correct = false;
		return;
	}

	m = calloc(1, sizeof(struct spdk_rpc_method));
	assert(m != NULL);

	m->name = strdup(method);
	assert(m->name != NULL);

	m->func = func;
	m->state_mask = state_mask;

	/* TODO: use a hash table or sorted list */
	SLIST_INSERT_HEAD(&g_rpc_methods, m, slist);

从上述method注册代码中可以看出,所有注册的接口所有的methods存在全局变量:g_rpc_methods,处理rpc请求时查询g_rpc_methods对应的method name找到对应的func执行相关请求。

当前spdk服务注册的methods方法可以通过spdk_rpc.py rpc_get_methods查询,例如:

3、rpc client端:spdk_rpc.py执行某个rpc请求即可。当rpc server正常服务启动之后,可以执行spdk服务支持的method方法,method的具体用法可以通过--help查看

例如:查询bdev_get_bdevs的用法

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0