Nova
中各个组件通过 oslo.messaging
实现的 RPC
框架来进行远程过程调用。RPC 调用需要指定目标的 transport_url
(目标消息队列 IP
和端口号)还有 topic
。其中 topic
指定具体的服务,如 "conductor"
。每个服务通过执行 nova.cmd.*
来启动,这是每个服务的入口 main
,以 nova.cmd.conductor.py
为例,main
函数中包含:
...
server = service.Service.create(binary='nova-conductor',
topic=CONF.conductor.topic)
...
上述代码为创建 nova-conductor
的调用。该行代码将会调用nova.service.py
中 Service
类的静态方法 create
来根据给定参数信息创建一个新的服务。在create
方法中,解析出相关参数后,实例化一个新的Service
对象并返回:
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max)
return service_obj
其中cls
为Service
类对象的引用。
接着在nova.cmd.conductor.py
中,执行:
service.serve(server, workers=workers)
上述调用nova.service.py
中的 serve
方法,该方法调用oslo.service
构造一个全局_launcher
对象:
_launcher = service.launch(CONF, server, workers=workers,
restart_method='mutate')
service.launch
方法构造launcher
对象,并调用该对象的launch_service
方法:
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
接着在launcher
的launch_service
方法中,构造一个ServiceWrapper
,然后调用launcher
的_start_child(wrap)
方法。
def launch_service(self, service, workers=1):
"""Launch a service with a given number of workers.
:param service: a service to launch, must be an instance of
:class:`oslo_service.service.ServiceBase`
:param workers: a number of processes in which a service
will be running
"""
_check_service_base(service)
wrap = ServiceWrapper(service, workers)
...
LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
接着在_start_child
方法中,会fork出新的子进程来运行,启动服务并开始一个事件循环:
def _start_child(self, wrap):
...
pid = os.fork()
if pid == 0:
# 在这行启动服务 wrap.service
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
在_child_process
方法中,将调用launcher.launch_service(service)
方法:
def _child_process(self, service):
...
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher
在launcher.launch_service
方法中执行:
self.services.add(service)
其中self.services
是一个service
列表。add
方法执行`self.tg.add_thread():
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
self.tg.add_thread()
方法内容如下:
def add_thread(self, callback, *args, **kwargs):
"""Spawn a new thread.
This call will block until capacity is available in the thread pool.
After that, it returns immediately (i.e. *before* the new thread is
scheduled).
:param callback: the function to run in the new thread.
:param args: positional arguments to the callback function.
:param kwargs: keyword arguments to the callback function.
:returns: a :class:`Thread` object
"""
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self, link=False)
self.threads.append(th)
gt.link(_on_thread_done, self, th)
return th
创建新的线程,然后在新的线程中执行 callback
函数,也就是传入的参数self.run_service
。在oslo.service.Services
类中,执行静态方法run_service(service)
:
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
try:
# 具体的服务在这里启动
service.start()
except Exception:
LOG.exception('Error starting thread.')
raise SystemExit(1)
else:
done.wait()
最终执行service.start()
启动服务。
这里为了阐明service.start()
如何启动服务,需要再回顾到nova.conductor.manager.py
。在conductor.py
中,定义conductorManager
类,该类会在初始化时将同一文件中的ComputeTaskManager
对象保存到endpoints
列表中,这个列表中的endpoint
是之后start
的入口。而ComputeTaskManager
,是conductor
服务真正处理api
请求的逻辑代码。ComputeTaskManager
代码如下:
@profiler.trace_cls("rpc")
class ComputeTaskManager(base.Base):
"""Namespace for compute methods.
This class presents an rpc API for nova-conductor under the 'compute_task'
namespace. The methods here are compute operations that are invoked
by the API service. These methods see the operation to completion, which
may involve coordinating activities on multiple compute nodes.
"""
target = messaging.Target(namespace='compute_task', version='1.20')
def __init__(self):
super(ComputeTaskManager, self).__init__()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.volume_api = cinder.API()
self.image_api = image.API()
self.network_api = network.API()
self.servicegroup_api = servicegroup.API()
self.scheduler_client = scheduler_client.SchedulerClient()
self.report_client = self.scheduler_client.reportclient
self.notifier = rpc.get_notifier('compute', CONF.host)
...
def build_instances(self, context, instances, image, filter_properties,
admin_password, injected_files, requested_networks,
security_groups, block_device_mapping=None, legacy_bdm=True,
request_spec=None, host_lists=None):
...
def _schedule_instances(self, context, request_spec,
instance_uuids=None, return_alternates=False):
scheduler_utils.setup_instance_group(context, request_spec)
host_lists = self.scheduler_client.select_destinations(context,
request_spec, instance_uuids, return_objects=True,
return_alternates=return_alternates)
return host_lists
...
def schedule_and_build_instances(self, context, build_requests,
request_specs, image,
admin_password, injected_files,
requested_networks, block_device_mapping,
tags=None):
...
在conductor
中将ComputeTaskManager
保存到endpoints
后,在执行service.start()
时:
...
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [
self.manager,
baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
]
# 这里把 ComputeTaskManager 合并到 endpoints 中
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.NovaObjectSerializer()
# 构造 rpcserver 对象,然后启动 rpcserver
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
...
在get_server()
方法中:
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
if profiler:
serializer = ProfilerRequestContextSerializer(serializer)
else:
serializer = RequestContextSerializer(serializer)
access_policy = dispatcher.DefaultRPCAccessPolicy
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer,
access_policy=access_policy)
接着跟入messaging.get_rpc_server
中:
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None, access_policy=None):
"""Construct an RPC server.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of message executor - available values are
'eventlet' and 'threading'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param access_policy: an optional access policy.
Defaults to DefaultRPCAccessPolicy
:type access_policy: RPCAccessPolicyBase
"""
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
access_policy)
return RPCServer(transport, target, dispatcher, executor)
dispatcher
分发器类将rpc
请求分发给相应的endpoint
处理。将具体的rpc
方法调用转发到具体的endpoint
中的方法,例如ComputeTaskManager
的live_migrate_instance
等。下面给出dispatch
代码:
def dispatch(self, incoming):
"""Dispatch an RPC message to the appropriate endpoint method.
:param incoming: incoming message
:type incoming: IncomingMessage
:raises: NoSuchMethod, UnsupportedVersion
"""
message = incoming.message
ctxt = incoming.ctxt
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target
if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue
if hasattr(endpoint, method):
if self.access_policy.is_allowed(endpoint, method):
return self._do_dispatch(endpoint, method, ctxt, args)