searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Nova 服务启动和 RPC 接受调用过程

2023-09-26 09:27:55
13
0

Nova 中各个组件通过 oslo.messaging 实现的 RPC 框架来进行远程过程调用。RPC 调用需要指定目标的 transport_url (目标消息队列 IP 和端口号)还有 topic。其中 topic 指定具体的服务,如 "conductor"。每个服务通过执行 nova.cmd.*来启动,这是每个服务的入口 main,以 nova.cmd.conductor.py 为例,main 函数中包含:

...
server = service.Service.create(binary='nova-conductor',
                                    topic=CONF.conductor.topic)
...

上述代码为创建 nova-conductor 的调用。该行代码将会调用nova.service.pyService类的静态方法 create来根据给定参数信息创建一个新的服务。在create方法中,解析出相关参数后,实例化一个新的Service对象并返回:

service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_enable=periodic_enable,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          periodic_interval_max=periodic_interval_max)

return service_obj

其中clsService类对象的引用。

接着在nova.cmd.conductor.py中,执行:

service.serve(server, workers=workers)

上述调用nova.service.py中的 serve方法,该方法调用oslo.service构造一个全局_launcher对象:

_launcher = service.launch(CONF, server, workers=workers,
                               restart_method='mutate')

service.launch方法构造launcher对象,并调用该对象的launch_service方法:

if workers is None or workers == 1:
    launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
    launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)

接着在launcherlaunch_service方法中,构造一个ServiceWrapper,然后调用launcher_start_child(wrap)方法。

def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)
        wrap = ServiceWrapper(service, workers)
				...
        LOG.info('Starting %d workers', wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:
            self._start_child(wrap)

 

接着在_start_child方法中,会fork出新的子进程来运行,启动服务并开始一个事件循环:

 

def _start_child(self, wrap):
        ...
        pid = os.fork()
        if pid == 0:
            # 在这行启动服务 wrap.service
            self.launcher = self._child_process(wrap.service)
            while True:
                self._child_process_handle_signal()
                status, signo = self._child_wait_for_exit_or_signal(
                    self.launcher)
                if not _is_sighup_and_daemon(signo):
                    self.launcher.wait()
                    break
                self.launcher.restart()
            os._exit(status)

_child_process方法中,将调用launcher.launch_service(service)方法:

def _child_process(self, service):
        ...
        launcher = Launcher(self.conf, restart_method=self.restart_method)
        launcher.launch_service(service)
        return launcher

 

launcher.launch_service方法中执行:

 

self.services.add(service)

其中self.services是一个service列表。add方法执行`self.tg.add_thread():

def add(self, service):
        """Add a service to a list and create a thread to run it.
        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

self.tg.add_thread()方法内容如下:

def add_thread(self, callback, *args, **kwargs):
        """Spawn a new thread.

        This call will block until capacity is available in the thread pool.
        After that, it returns immediately (i.e. *before* the new thread is
        scheduled).

        :param callback: the function to run in the new thread.
        :param args: positional arguments to the callback function.
        :param kwargs: keyword arguments to the callback function.
        :returns: a :class:`Thread` object
        """
        gt = self.pool.spawn(callback, *args, **kwargs)
        th = Thread(gt, self, link=False)
        self.threads.append(th)
        gt.link(_on_thread_done, self, th)
        return th

创建新的线程,然后在新的线程中执行 callback函数,也就是传入的参数self.run_service。在oslo.service.Services类中,执行静态方法run_service(service):

@staticmethod
    def run_service(service, done):
        """Service start wrapper.

        :param service: service to run
        :param done: event to wait on until a shutdown is triggered
        :returns: None

        """
        try:
          	# 具体的服务在这里启动
            service.start()
        except Exception:
            LOG.exception('Error starting thread.')
            raise SystemExit(1)
        else:
            done.wait()

最终执行service.start()启动服务。

这里为了阐明service.start()如何启动服务,需要再回顾到nova.conductor.manager.py。在conductor.py中,定义conductorManager类,该类会在初始化时将同一文件中的ComputeTaskManager对象保存到endpoints列表中,这个列表中的endpoint是之后start的入口。而ComputeTaskManager,是conductor服务真正处理api请求的逻辑代码。ComputeTaskManager代码如下:

@profiler.trace_cls("rpc")
class ComputeTaskManager(base.Base):
    """Namespace for compute methods.

    This class presents an rpc API for nova-conductor under the 'compute_task'
    namespace.  The methods here are compute operations that are invoked
    by the API service.  These methods see the operation to completion, which
    may involve coordinating activities on multiple compute nodes.
    """

    target = messaging.Target(namespace='compute_task', version='1.20')

    def __init__(self):
        super(ComputeTaskManager, self).__init__()
        self.compute_rpcapi = compute_rpcapi.ComputeAPI()
        self.volume_api = cinder.API()
        self.image_api = image.API()
        self.network_api = network.API()
        self.servicegroup_api = servicegroup.API()
        self.scheduler_client = scheduler_client.SchedulerClient()
        self.report_client = self.scheduler_client.reportclient
        self.notifier = rpc.get_notifier('compute', CONF.host)
...

    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping=None, legacy_bdm=True,
            request_spec=None, host_lists=None):
...
    
    def _schedule_instances(self, context, request_spec,
                            instance_uuids=None, return_alternates=False):
        scheduler_utils.setup_instance_group(context, request_spec)
        host_lists = self.scheduler_client.select_destinations(context,
                request_spec, instance_uuids, return_objects=True,
                return_alternates=return_alternates)
        return host_lists
...
    def schedule_and_build_instances(self, context, build_requests,
                                     request_specs, image,
                                     admin_password, injected_files,
                                     requested_networks, block_device_mapping,
                                     tags=None):
...

 

conductor中将ComputeTaskManager保存到endpoints后,在执行service.start()时:

...
        LOG.debug("Creating RPC server for service %s", self.topic)

        target = messaging.Target(topic=self.topic, server=self.host)

        endpoints = [
            self.manager,
            baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
        ]
        # 这里把 ComputeTaskManager 合并到 endpoints 中
        endpoints.extend(self.manager.additional_endpoints)

        serializer = objects_base.NovaObjectSerializer()
				
        # 构造 rpcserver 对象,然后启动 rpcserver
        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        self.rpcserver.start()
...

 

get_server()方法中:

 

def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None

    if profiler:
        serializer = ProfilerRequestContextSerializer(serializer)
    else:
        serializer = RequestContextSerializer(serializer)
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer,
                                    access_policy=access_policy)

接着跟入messaging.get_rpc_server中:

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None, access_policy=None):
    """Construct an RPC server.

    :param transport: the messaging transport
    :type transport: Transport
    :param target: the exchange, topic and server to listen on
    :type target: Target
    :param endpoints: a list of endpoint objects
    :type endpoints: list
    :param executor: name of message executor - available values are
                     'eventlet' and 'threading'
    :type executor: str
    :param serializer: an optional entity serializer
    :type serializer: Serializer
    :param access_policy: an optional access policy.
           Defaults to DefaultRPCAccessPolicy
    :type access_policy: RPCAccessPolicyBase
    """
    dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
                                              access_policy)
    return RPCServer(transport, target, dispatcher, executor)

dispatcher分发器类将rpc请求分发给相应的endpoint处理。将具体的rpc方法调用转发到具体的endpoint中的方法,例如ComputeTaskManagerlive_migrate_instance等。下面给出dispatch代码:

 

def dispatch(self, incoming):
        """Dispatch an RPC message to the appropriate endpoint method.

        :param incoming: incoming message
        :type incoming: IncomingMessage
        :raises: NoSuchMethod, UnsupportedVersion
        """
        message = incoming.message
        ctxt = incoming.ctxt

        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')

        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target

            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue

            if hasattr(endpoint, method):
                if self.access_policy.is_allowed(endpoint, method):
                    return self._do_dispatch(endpoint, method, ctxt, args)

 

0条评论
0 / 1000
张运昌
3文章数
0粉丝数
张运昌
3 文章 | 0 粉丝
张运昌
3文章数
0粉丝数
张运昌
3 文章 | 0 粉丝
原创

Nova 服务启动和 RPC 接受调用过程

2023-09-26 09:27:55
13
0

Nova 中各个组件通过 oslo.messaging 实现的 RPC 框架来进行远程过程调用。RPC 调用需要指定目标的 transport_url (目标消息队列 IP 和端口号)还有 topic。其中 topic 指定具体的服务,如 "conductor"。每个服务通过执行 nova.cmd.*来启动,这是每个服务的入口 main,以 nova.cmd.conductor.py 为例,main 函数中包含:

...
server = service.Service.create(binary='nova-conductor',
                                    topic=CONF.conductor.topic)
...

上述代码为创建 nova-conductor 的调用。该行代码将会调用nova.service.pyService类的静态方法 create来根据给定参数信息创建一个新的服务。在create方法中,解析出相关参数后,实例化一个新的Service对象并返回:

service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_enable=periodic_enable,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          periodic_interval_max=periodic_interval_max)

return service_obj

其中clsService类对象的引用。

接着在nova.cmd.conductor.py中,执行:

service.serve(server, workers=workers)

上述调用nova.service.py中的 serve方法,该方法调用oslo.service构造一个全局_launcher对象:

_launcher = service.launch(CONF, server, workers=workers,
                               restart_method='mutate')

service.launch方法构造launcher对象,并调用该对象的launch_service方法:

if workers is None or workers == 1:
    launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
    launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)

接着在launcherlaunch_service方法中,构造一个ServiceWrapper,然后调用launcher_start_child(wrap)方法。

def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)
        wrap = ServiceWrapper(service, workers)
				...
        LOG.info('Starting %d workers', wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:
            self._start_child(wrap)

 

接着在_start_child方法中,会fork出新的子进程来运行,启动服务并开始一个事件循环:

 

def _start_child(self, wrap):
        ...
        pid = os.fork()
        if pid == 0:
            # 在这行启动服务 wrap.service
            self.launcher = self._child_process(wrap.service)
            while True:
                self._child_process_handle_signal()
                status, signo = self._child_wait_for_exit_or_signal(
                    self.launcher)
                if not _is_sighup_and_daemon(signo):
                    self.launcher.wait()
                    break
                self.launcher.restart()
            os._exit(status)

_child_process方法中,将调用launcher.launch_service(service)方法:

def _child_process(self, service):
        ...
        launcher = Launcher(self.conf, restart_method=self.restart_method)
        launcher.launch_service(service)
        return launcher

 

launcher.launch_service方法中执行:

 

self.services.add(service)

其中self.services是一个service列表。add方法执行`self.tg.add_thread():

def add(self, service):
        """Add a service to a list and create a thread to run it.
        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

self.tg.add_thread()方法内容如下:

def add_thread(self, callback, *args, **kwargs):
        """Spawn a new thread.

        This call will block until capacity is available in the thread pool.
        After that, it returns immediately (i.e. *before* the new thread is
        scheduled).

        :param callback: the function to run in the new thread.
        :param args: positional arguments to the callback function.
        :param kwargs: keyword arguments to the callback function.
        :returns: a :class:`Thread` object
        """
        gt = self.pool.spawn(callback, *args, **kwargs)
        th = Thread(gt, self, link=False)
        self.threads.append(th)
        gt.link(_on_thread_done, self, th)
        return th

创建新的线程,然后在新的线程中执行 callback函数,也就是传入的参数self.run_service。在oslo.service.Services类中,执行静态方法run_service(service):

@staticmethod
    def run_service(service, done):
        """Service start wrapper.

        :param service: service to run
        :param done: event to wait on until a shutdown is triggered
        :returns: None

        """
        try:
          	# 具体的服务在这里启动
            service.start()
        except Exception:
            LOG.exception('Error starting thread.')
            raise SystemExit(1)
        else:
            done.wait()

最终执行service.start()启动服务。

这里为了阐明service.start()如何启动服务,需要再回顾到nova.conductor.manager.py。在conductor.py中,定义conductorManager类,该类会在初始化时将同一文件中的ComputeTaskManager对象保存到endpoints列表中,这个列表中的endpoint是之后start的入口。而ComputeTaskManager,是conductor服务真正处理api请求的逻辑代码。ComputeTaskManager代码如下:

@profiler.trace_cls("rpc")
class ComputeTaskManager(base.Base):
    """Namespace for compute methods.

    This class presents an rpc API for nova-conductor under the 'compute_task'
    namespace.  The methods here are compute operations that are invoked
    by the API service.  These methods see the operation to completion, which
    may involve coordinating activities on multiple compute nodes.
    """

    target = messaging.Target(namespace='compute_task', version='1.20')

    def __init__(self):
        super(ComputeTaskManager, self).__init__()
        self.compute_rpcapi = compute_rpcapi.ComputeAPI()
        self.volume_api = cinder.API()
        self.image_api = image.API()
        self.network_api = network.API()
        self.servicegroup_api = servicegroup.API()
        self.scheduler_client = scheduler_client.SchedulerClient()
        self.report_client = self.scheduler_client.reportclient
        self.notifier = rpc.get_notifier('compute', CONF.host)
...

    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping=None, legacy_bdm=True,
            request_spec=None, host_lists=None):
...
    
    def _schedule_instances(self, context, request_spec,
                            instance_uuids=None, return_alternates=False):
        scheduler_utils.setup_instance_group(context, request_spec)
        host_lists = self.scheduler_client.select_destinations(context,
                request_spec, instance_uuids, return_objects=True,
                return_alternates=return_alternates)
        return host_lists
...
    def schedule_and_build_instances(self, context, build_requests,
                                     request_specs, image,
                                     admin_password, injected_files,
                                     requested_networks, block_device_mapping,
                                     tags=None):
...

 

conductor中将ComputeTaskManager保存到endpoints后,在执行service.start()时:

...
        LOG.debug("Creating RPC server for service %s", self.topic)

        target = messaging.Target(topic=self.topic, server=self.host)

        endpoints = [
            self.manager,
            baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
        ]
        # 这里把 ComputeTaskManager 合并到 endpoints 中
        endpoints.extend(self.manager.additional_endpoints)

        serializer = objects_base.NovaObjectSerializer()
				
        # 构造 rpcserver 对象,然后启动 rpcserver
        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        self.rpcserver.start()
...

 

get_server()方法中:

 

def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None

    if profiler:
        serializer = ProfilerRequestContextSerializer(serializer)
    else:
        serializer = RequestContextSerializer(serializer)
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer,
                                    access_policy=access_policy)

接着跟入messaging.get_rpc_server中:

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None, access_policy=None):
    """Construct an RPC server.

    :param transport: the messaging transport
    :type transport: Transport
    :param target: the exchange, topic and server to listen on
    :type target: Target
    :param endpoints: a list of endpoint objects
    :type endpoints: list
    :param executor: name of message executor - available values are
                     'eventlet' and 'threading'
    :type executor: str
    :param serializer: an optional entity serializer
    :type serializer: Serializer
    :param access_policy: an optional access policy.
           Defaults to DefaultRPCAccessPolicy
    :type access_policy: RPCAccessPolicyBase
    """
    dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
                                              access_policy)
    return RPCServer(transport, target, dispatcher, executor)

dispatcher分发器类将rpc请求分发给相应的endpoint处理。将具体的rpc方法调用转发到具体的endpoint中的方法,例如ComputeTaskManagerlive_migrate_instance等。下面给出dispatch代码:

 

def dispatch(self, incoming):
        """Dispatch an RPC message to the appropriate endpoint method.

        :param incoming: incoming message
        :type incoming: IncomingMessage
        :raises: NoSuchMethod, UnsupportedVersion
        """
        message = incoming.message
        ctxt = incoming.ctxt

        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')

        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target

            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue

            if hasattr(endpoint, method):
                if self.access_policy.is_allowed(endpoint, method):
                    return self._do_dispatch(endpoint, method, ctxt, args)

 

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0