码迷,mamicode.com
首页 > 其他好文 > 详细

neutron-server的启动流程(一)

时间:2016-05-07 07:12:45      阅读:955      评论:0      收藏:0      [点我收藏+]

标签:

neutron-server的启动包括RPC-server的创建,RPC-client的创建,WSGI server的创建,因此neutron-server不单单起到与其他组件中的api的功能。本文将RPC相关创建和WSGI server的创建两方面进行代码流程的分析。

查看setup.cfg文件找到neutron-server的代码入口。

neutron-server = neutron.cmd.eventlet.server:main

#/neutron/cmd/eventlet/server/__init__.py
def main():
    server.main()

#/neutron/server/__init__.py
def main():
    # the configuration will be read into the cfg.CONF global data structure
    config.init(sys.argv[1:])
    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the '--config-file' option!"))
    try:
        pool = eventlet.GreenPool()

        neutron_api = service.serve_wsgi(service.NeutronApiService)
        api_thread = pool.spawn(neutron_api.wait)

        try:
            neutron_rpc = service.serve_rpc()
        except NotImplementedError:
            LOG.info(_LI("RPC was already started in parent process by "
                         "plugin."))
        else:
            rpc_thread = pool.spawn(neutron_rpc.wait)

            # api and rpc should die together.  When one dies, kill the other.
            rpc_thread.link(lambda gt: api_thread.kill())
            api_thread.link(lambda gt: rpc_thread.kill())

        pool.waitall()
    except KeyboardInterrupt:
        pass
    except RuntimeError as e:
        sys.exit(_("ERROR: %s") % e)

1. WSGIserver创建

neutron_api = service.serve_wsgi(service.NeutronApiService)

#/neutron/service.py
def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log '
                              'for details.'))

    return service

#/neutron/service.py:NeutronApiService
class NeutronApiService(WsgiService):
    """Class for neutron-api service."""

    @classmethod
    def create(cls, app_name='neutron'):

        # Setup logging early, supplying both the CLI options and the
        # configuration mapping from the config file
        # We only update the conf dict for the verbose and debug
        # flags. Everything else must be set up in the conf file...
        # Log the options used when starting if we're in debug mode...

        config.setup_logging()
        # Dump the initial option values
        cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
        service = cls(app_name)
        return service

serve_wsgi函数将传入的NeutronApiService对象进行创建,并执行该对象的start函数,创建WSGI sever。

#/neutron/service.py:WsgiService
    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

#/neutron/service.py
def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=cfg.CONF.api_workers)
    # Dump all option values here after all options are parsed
    cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server

其中,

app = config.load_paste_app(app_name)

为从api-paste.ini文件加载composite为neutron的相关信息。首先创建/neutron/wsgi.py:Server对象,执行Server对象的start函数,将创建WSGI server。

#/neutron/wsgi.py:Server
    def start(self, application, port, host='0.0.0.0', workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

    def _launch(self, application, workers=0):
        service = WorkerService(self, application)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            if CONF.database.connection:
                api.get_engine().pool.dispose()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(wait_interval=1.0)
            self._server.launch_service(service, workers=workers)

其中,start函数中的host即为neutron.conf配置文件中bind_host参数,一般设置为管理网络IP,port为neutron.conf文件中的bind_port参数,一般neutron-server的bind_port参数设置为9696。首先在start函数调用self._get_socket函数创建一个socket去监听本机的9696端口。以下是我的OpenStack环境的neutron-server进程。

[root@jun neutron]# netstat -tnulp | grep 9696

tcp        0      0 192.168.118.1:9696      0.0.0.0:*               LISTEN      35298/python2

然后在_lauch函数中创建WSGI server进程(即创建子进程用于处理neutronclient发来的HTTP请求)。这里WSGI server进程的个数根据neutron.conf配置文件中的api_workers进行指定,一般为系统cpu的个数。

具体的创建WSGI server进程的代码分析可以参看《 keystone WSGI流程》文章。下面分析最终根据api-paste.ini文件加载的可调用的resources。api-paste.ini文件内容如下。

[composite:neutron]

use = egg:Paste#urlmap

/: neutronversions

/v2.0: neutronapi_v2_0

 

[composite:neutronapi_v2_0]

use = call:neutron.auth:pipeline_factory

noauth = request_id catch_errors extensions neutronapiapp_v2_0

keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

 

[filter:request_id]

paste.filter_factory = oslo.middleware:RequestId.factory

 

[filter:catch_errors]

paste.filter_factory = oslo.middleware:CatchErrors.factory

 

[filter:keystonecontext]

paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

 

[filter:authtoken]

paste.filter_factory = keystonemiddleware.auth_token:filter_factory

 

[filter:extensions]

paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

 

[app:neutronversions]

paste.app_factory = neutron.api.versions:Versions.factory

 

[app:neutronapiapp_v2_0]

paste.app_factory = neutron.api.v2.router:APIRouter.factory

composite为neutron时,有两个分支,一个是返回neutronversions的分支,一个是到composite为neutronapi_v2_0去调用相应的资源。我们发送HTTP请求到neutron,一般会调用/v2.0上的资源。比如create_port, update_port等等。查看composite为neutronapi_v2_0的部分,不管是noauth还是keystone,最终都将到app为neutronapiapp_v2_0的分支去加载资源。其中factory函数如下。

#/neutron/api/v2/router.py:APIRouter
class APIRouter(wsgi.Router):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        plugin = manager.NeutronManager.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            allow_pagination = cfg.CONF.allow_pagination
            allow_sorting = cfg.CONF.allow_sorting
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=allow_pagination,
                allow_sorting=allow_sorting)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)

1.1 创建plugin(包括core plugin和service plugin)

#/neutron/api/v2/router.py:APIRouter
plugin = manager.NeutronManager.get_plugin()

#/neutron/manager.py:NeutronManager
    @classmethod
    def get_plugin(cls):
        # Return a weakref to minimize gc-preventing references.
        return weakref.proxy(cls.get_instance().plugin)

#/neutron/manager.py:NeutronManager
    @classmethod
    def get_instance(cls):
        # double checked locking
        if not cls.has_instance():
            cls._create_instance()
        return cls._instance

#/neutron/manager.py:NeutronManager
    @classmethod
    @utils.synchronized("manager")
    def _create_instance(cls):
        if not cls.has_instance():
            cls._instance = cls()

#/neutron/manager.py:NeutronManager
class NeutronManager(object):
    """Neutron's Manager class.

    Neutron's Manager class is responsible for parsing a config file and
    instantiating the correct plugin that concretely implements
    neutron_plugin_base class.
    The caller should make sure that NeutronManager is a singleton.
    """
    _instance = None

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}

        msg = validate_pre_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentionally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
        self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
                                                plugin_provider)
        msg = validate_post_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()

get_plugin函数返回NeutronManager的plugin(core plugin)的弱引用,那么core plugin是什么对象呢?core plugin和service plugin在NeutronManager的__init__函数创建的。其中core plugin和service plugin分别根据neutron.conf配置文件中的core_plugin参数和service_plugins参数进行创建。

1.1.1 core plugin的创建

我的OpenStack环境的neutron.conf配置文件中的core_plugin参数值为:

core_plugin =neutron.plugins.ml2.plugin.Ml2Plugin

其创建在_get_plugin_instance函数中执行。

#/neutron/manager.py:NeutronManager
    def _get_plugin_instance(self, namespace, plugin_provider):
        try:
            # Try to resolve plugin by name
            mgr = driver.DriverManager(namespace, plugin_provider)
            plugin_class = mgr.driver
        except RuntimeError as e1:
            # fallback to class name
            try:
                plugin_class = importutils.import_class(plugin_provider)
            except ImportError as e2:
                LOG.exception(_LE("Error loading plugin by name, %s"), e1)
                LOG.exception(_LE("Error loading plugin by class, %s"), e2)
                raise ImportError(_("Plugin not found."))
        return plugin_class()

利用stevedore模块创建core_plugin,即core_plugin为/neutron/plugins/ml2/plugin.py:Ml2Plugin对象。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    def __init__(self):
        # First load drivers, then initialize DB, then initialize drivers
        self.type_manager = managers.TypeManager()
        self.extension_manager = managers.ExtensionManager()
        self.mechanism_manager = managers.MechanismManager()
        super(Ml2Plugin, self).__init__()
        self.type_manager.initialize()
        self.extension_manager.initialize()
        self.mechanism_manager.initialize()

        self._setup_rpc()

        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))

由于type_manager,extension_manager和mechanism_manager的创建都类似,所以这里我们主要分析type_manager代码流程,其余的简要说明。

#/neutron/plugins/ml2/managers.py:TypeManager
class TypeManager(stevedore.named.NamedExtensionManager):
    """Manage network segment types using drivers."""

    def __init__(self):
        # Mapping from type name to DriverManager
        self.drivers = {}

        LOG.info(_LI("Configured type driver names: %s"),
                 cfg.CONF.ml2.type_drivers)
        super(TypeManager, self).__init__('neutron.ml2.type_drivers',
                                          cfg.CONF.ml2.type_drivers,
                                          invoke_on_load=True)
        LOG.info(_LI("Loaded type driver names: %s"), self.names())
        self._register_types()
        self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)

这里,TypeManager对象中的drivers即为type driver对象。drivers根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的type_drivers参数去构造。在我的OpenStack环境中type_drivers参数值如下。

type_drivers = vlan,flat

两个driver对象的创建在TypeManager类的父类中完成。且创建两个对象被/stevedore/extension.py:Extension包裹。

#/neutron/plugins/ml2/managers.py:TypeManager
    def _register_types(self):
        for ext in self:
            network_type = ext.obj.get_type()
            if network_type in self.drivers:
                LOG.error(_LE("Type driver '%(new_driver)s' ignored because"
                              " type driver '%(old_driver)s' is already"
                              " registered for type '%(type)s'"),
                          {'new_driver': ext.name,
                           'old_driver': self.drivers[network_type].name,
                           'type': network_type})
            else:
                self.drivers[network_type] = ext
        LOG.info(_LI("Registered types: %s"), self.drivers.keys())

这里,将创建完成的typedriver对象register到self.drivers字典中,如下。

self.drivers: {‘flat‘: <stevedore.extension.Extension object at 0x428cad0>, ‘vlan‘: <stevedore.extension.Extension object at 0x4299150>}

如果调用实际的flat或vlan对于的type driver的函数,需要访问/stevedore/extension.py:Extension对象的obj成员变量,obj成员变量即为实际的type driver的对象。如下

‘flat’: <neutron.plugins.ml2.drivers.type_flat.FlatTypeDriver object at 0x46cfb50>

‘vlan’: <neutron.plugins.ml2.drivers.type_vlan.VlanTypeDriver object at 0x46a9bd0>

在register typedriver后,check tenant的network type是否在我们所register的type driver中,如果没有,则raise异常。其中tenant的network type是由/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的tenant_network_types参数设置。我的OpenStack环境的tenant_network_types参数配置如下。

tenant_network_types = vlan

#/neutron/plugins/ml2/managers.py:TypeManager
    def _check_tenant_network_types(self, types):
        self.tenant_network_types = []
        for network_type in types:
            if network_type in self.drivers:
                self.tenant_network_types.append(network_type)
            else:
                LOG.error(_LE("No type driver for tenant network_type: %s. "
                              "Service terminated!"), network_type)
                raise SystemExit(1)
        LOG.info(_LI("Tenant network_types: %s"), self.tenant_network_types)

所以执行_check_tenant_network_types函数时,tenant network type校验成功。

mechanism_manager的创建也是类似的,它管理的mechanism driver。其根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的mechanism_drivers参数去构造mechanism driver对象。我的OpenStack环境mechanism_drivers配置参数如下。

mechanism_drivers =linuxbridge

构建的mechanismdriver对象为:

<neutron.plugins.ml2.drivers.mech_linuxbridge.LinuxbridgeMechanismDriver object at 0x3f42b90>

extension_manager的创建也类似。

在type_manager,extension_manager和mechanism_manager创建完成后,执行initialize函数对所创建的driver进行初始化。比如type_manager的initialize函数如下。

#/neutron/plugins/ml2/managers.py:TypeManager
    def initialize(self):
        for network_type, driver in self.drivers.iteritems():
            LOG.info(_LI("Initializing driver for type '%s'"), network_type)
            driver.obj.initialize()

从上可以看出,最终会调用type_manager所管理的driver的initialize函数。

下面分析创建rpc-client的代码流程。即执行以下代码。

self._setup_rpc()

 

#/neutron/plugins/ml2/managers.py:TypeManager
    def _setup_rpc(self):
        self.notifier = rpc.AgentNotifierApi(topics.AGENT)
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
        )

这里topics.AGENT=’q-agent-notifier’。

#/neutron/plugins/ml2/rpc.py:AgentNotifierApi
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
                       sg_rpc.SecurityGroupAgentRpcApiMixin,
                       type_tunnel.TunnelAgentRpcApiMixin):
    """Agent side of the openvswitch rpc API.

    API version history:
        1.0 - Initial version.
        1.1 - Added get_active_networks_info, create_dhcp_port,
              update_dhcp_port, and removed get_dhcp_port methods.

    """

    def __init__(self, topic):
        self.topic = topic
        self.topic_network_delete = topics.get_topic_name(topic,
                                                          topics.NETWORK,
                                                          topics.DELETE)
        self.topic_port_update = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.UPDATE)
        self.topic_port_delete = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.DELETE)

        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/common/topics.py
def get_topic_name(prefix, table, operation, host=None):
    """Create a topic name.

    The topic name needs to be synced between the agent and the
    plugin. The plugin will send a fanout message to all of the
    listening agents so that the agents in turn can perform their
    updates accordingly.

    :param prefix: Common prefix for the plugin/agent message queues.
    :param table: The table in question (NETWORK, SUBNET, PORT).
    :param operation: The operation that invokes notification (CREATE,
                      DELETE, UPDATE)
    :param host: Add host to the topic
    :returns: The topic name.
    """
    if host:
        return '%s-%s-%s.%s' % (prefix, table, operation, host)
    return '%s-%s-%s' % (prefix, table, operation)

其中/neutron/common/topics.py文件中定义了一些常量如下。

#/neutron/common/topics.py

NETWORK = ‘network‘

SUBNET = subnet

PORT = ‘port‘

SECURITY_GROUP = ‘security_group‘

L2POPULATION = ‘l2population‘

DVR = dvr

 

CREATE = ‘create‘

DELETE = ‘delete‘

UPDATE = ‘update‘

 

AGENT = ‘q-agent-notifier‘

PLUGIN = ‘q-plugin

L3PLUGIN = ‘q-l3-plugin

DHCP = ‘q-dhcp-notifer

FIREWALL_PLUGIN = ‘q-firewall-plugin

METERING_PLUGIN = ‘q-metering-plugin

LOADBALANCER_PLUGIN = ‘n-lbaas-plugin

 

L3_AGENT = ‘l3_agent‘

DHCP_AGENT = ‘dhcp_agent‘

METERING_AGENT = ‘metering_agent‘

LOADBALANCER_AGENT = ‘n-lbaas_agent‘

所以AgentNotifierApi对象创建了通过AMQP到topic为’q-agent-notifier’的RPC-server的绑定。即AgentNotifierApi可远程调用执行topic为’q-agent-notifier’的RPC-server上的函数。不过这里,并没有纯粹的调用topic为’q-agent-notifier’的RPC-server函数。而是通过prepare函数修改topic构建新的Target来调用3种另外的topic的RPC-server上的函数。这3种topic为(根据get_topic_name函数构造出来的):

‘q-agent-notifier-network-delete’

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

那么对应的这3个topic的RPC-server是哪个服务构建的呢?通过AgentNotifierApi类的解释可知,应该是neutron-openvswitch-agent服务构建的。如下

#/neutron/plugins/opensvswitch/agent/ovs_neutron_agent.py:OVSNeutronAgent
    def setup_rpc(self):
        self.agent_id = 'ovs-agent-%s' % cfg.CONF.host
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.endpoints = [self]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.PORT, topics.DELETE],
                     [topics.NETWORK, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE],
                     [topics.DVR, topics.UPDATE]]
        if self.l2_pop:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers,
                                                     start_listening=False)

#/neutron/agent/rpc.py
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
    """Create agent RPC consumers.

    :param endpoints: The list of endpoints to process the incoming messages.
    :param prefix: Common prefix for the plugin/agent message queues.
    :param topic_details: A list of topics. Each topic has a name, an
                          operation, and an optional host param keying the
                          subscription to topic.host for plugin calls.
    :param start_listening: if True, it starts the processing loop

    :returns: A common Connection.
    """

    connection = n_rpc.create_connection(new=True)
    for details in topic_details:
        topic, operation, node_name = itertools.islice(
            itertools.chain(details, [None]), 3)

        topic_name = topics.get_topic_name(prefix, topic, operation)
        connection.create_consumer(topic_name, endpoints, fanout=True)
        if node_name:
            node_topic_name = '%s.%s' % (topic_name, node_name)
            connection.create_consumer(node_topic_name,
                                       endpoints,
                                       fanout=False)
    if start_listening:
        connection.consume_in_threads()
    return connection

从上面的代码可以看出,neutron-openvswitch-agent构建的RPC-server将会最多有8个topic。即

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

‘q-agent-notifier-network-delete’

‘q-agent-notifier-tunnel-update’

‘q-agent-notifier-tunnel-delete’

‘q-agent-notifier-security_group-update’

‘q-agent-notifier-dvr-update’

‘q-agent-notifier-l2population–update.jun2’

从上面8个topic可以看出,包括AgentNotifierApi类所构建RPC-client所需的topic。

不过,我的OpenStack环境采用的linuxbridge的mechanism driver,而在neutron-linuxbridge-agent服务启动时,创建RPC-server时,没有topic’ q-agent-notifier-port-delete’Target,所以采用的linuxbridgemechanism driver时,AgentNotifierApi对象不能调用port_delete函数。如下

#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
    def setup_rpc(self, physical_interfaces):
        if physical_interfaces:
            mac = utils.get_interface_mac(physical_interfaces[0])
        else:
            devices = ip_lib.IPWrapper().get_devices(True)
            if devices:
                mac = utils.get_interface_mac(devices[0].name)
            else:
                LOG.error(_LE("Unable to obtain MAC address for unique ID. "
                              "Agent terminated!"))
                exit(1)
        self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
        LOG.info(_LI("RPC agent_id: %s"), self.agent_id)

        self.topic = topics.AGENT
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        # Handle updates from service
        self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
                                                  self.sg_agent)]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE]]
        if cfg.CONF.VXLAN.l2_population:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers)
        report_interval = cfg.CONF.AGENT.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)

下面分析另一个跟dhcp相关的RPC-client的创建。

        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (

            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

        )

其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = {
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    }

DhcpAgentNotifyAPI对象的创建如下。

#/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py:DhcpAgentNotifyAPI
class DhcpAgentNotifyAPI(object):
    """API for plugin to notify DHCP agent.

    This class implements the client side of an rpc interface.  The server side
    is neutron.agent.dhcp_agent.DhcpAgent.  For more information about changing
    rpc interfaces, please see doc/source/devref/rpc_api.rst.
    """
    # It seems dhcp agent does not support bulk operation
    VALID_RESOURCES = ['network', 'subnet', 'port']
    VALID_METHOD_NAMES = ['network.create.end',
                          'network.update.end',
                          'network.delete.end',
                          'subnet.create.end',
                          'subnet.update.end',
                          'subnet.delete.end',
                          'port.create.end',
                          'port.update.end',
                          'port.delete.end']

    def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
        self._plugin = plugin
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

其中RPC-client的topic的值为’dhcp_agent’。而对应的RPC-server是在neutron-dhcp-agent服务启动时创建的。

#neutron/agent/dhcp_agent.py
def main():
    register_options()
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-dhcp-agent',
        topic=topics.DHCP_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
    service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

总结一下,上面的coreplugin中的RPC-client与RPC-server的对应关系。

RPC-client

RPC-server

Class

service

endpoints

topic

service

AgentNotifierApi

neutron-server

(core plugin)

OVSNeutronAgent

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-openvswitch-agent

LinuxBridgeRpcCallbacks

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-linuxbridge-agent

DhcpAgentNotifyAPI

DhcpAgentWithStateReport

(inherit from DhcpAgent)

dhcp_agent

(topics.DHCP_AGENT)

neutron-dhcp-agent

其中,xxx表示资源(如port,network等),yyy表示操作(如update,delete等)。

至此_setup_rpc函数分析完成。

Ml2Plugin的__init__函数还剩下以下代码。

#neutron/plugins/ml2/plugin.py:Ml2Plugin.__init__
        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))

network_scheduler_driver是在neutron.conf配置文件中的参数,其默认值为:

network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler

所以self.network_scheduler为/neutron/scheduler/dhcp_agent_scheduler.py:ChanceScheduler对象。

start_periodic_dhcp_agent_status_check函数是一个周期性检测函数。

目前把core plugin的代码流程分析完成。下面分析service plugin的代码流程。

1.1.2 service plugin的创建

#/neutron/manager.py:NeutronManager.__init__
        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()

service plugin首先将core plugin载入到self.service_plugins中,然后执行self._load_service_plugins函数将自身的service plugin载入到self.service_plugins中。

#/neutron/manager.py:NeutronManager
    def _load_service_plugins(self):
        """Loads service plugins.

        Starts from the core plugin and checks if it supports
        advanced services then loads classes provided in configuration.
        """
        # load services from the core plugin first
        self._load_services_from_core_plugin()

        plugin_providers = cfg.CONF.service_plugins
        LOG.debug("Loading service plugins: %s", plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue

            LOG.info(_LI("Loading Plugin: %s"), provider)
            plugin_inst = self._get_plugin_instance('neutron.service_plugins',
                                                    provider)

            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise ValueError(_("Multiple plugins for service "
                                   "%s were configured") %
                                 plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            # search for possible agent notifiers declared in service plugin
            # (needed by agent management extension)
            if (hasattr(self.plugin, 'agent_notifiers') and
                    hasattr(plugin_inst, 'agent_notifiers')):
                self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)

            LOG.debug("Successfully loaded %(type)s plugin. "
                      "Description: %(desc)s",
                      {"type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description()})

首先从core plugin中load service plugin。

#/neutron/manager.py:NeutronManager
    def _load_services_from_core_plugin(self):
        """Puts core plugin in service_plugins for supported services."""
        LOG.debug("Loading services supported by the core plugin")

        # supported service types are derived from supported extensions
        for ext_alias in getattr(self.plugin,
                                 "supported_extension_aliases", []):
            if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
                service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
                self.service_plugins[service_type] = self.plugin
                LOG.info(_LI("Service %s is supported by the core plugin"),
                         service_type)

其中EXT_TO_SERVICE_MAPPING的信息如下。

#/neutron/plugins/common/constants.py
# service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"


#maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
    'dummy': DUMMY,
    'lbaas': LOADBALANCER,
    'lbaasv2': LOADBALANCERV2,
    'fwaas': FIREWALL,
    'vpnaas': VPN,
    'metering': METERING,
    'router': L3_ROUTER_NAT
}

即从coreplugin(Ml2Plugin对象)的supported_extension_aliases函数返回的alias查找是否有EXT_TO_SERVICE_MAPPING相匹配的alias,如果有,则将core plugin载入到service plugin中。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    # List of supported extensions
    _supported_extension_aliases = ["provider", "external-net", "binding",
                                    "quotas", "security-group", "agent",
                                    "dhcp_agent_scheduler",
                                    "multi-provider", "allowed-address-pairs",
                                    "extra_dhcp_opt", "subnet_allocation",
                                    "net-mtu", "vlan-transparent"]

    @property
    def supported_extension_aliases(self):
        if not hasattr(self, '_aliases'):
            aliases = self._supported_extension_aliases[:]
            aliases += self.extension_manager.extension_aliases()
            sg_rpc.disable_security_group_extension_by_config(aliases)
            vlantransparent.disable_extension_by_config(aliases)
            self._aliases = aliases
        return self._aliases

supported_extension_aliases函数将_supported_extension_aliases列表的alias赋给self._aliases变量。不过_supported_extension_aliases列表有13个alias,在supported_extension_aliases函数会判断是否移除’security-group’和’vlan-transparent’这两个alias。拿security-group举例说明。

#/neutron/agent/securitygroups_rpc.py
#This is backward compatibility check for Havana
def _is_valid_driver_combination():
    return ((cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver and
              cfg.CONF.SECURITYGROUP.firewall_driver !=
             'neutron.agent.firewall.NoopFirewallDriver')) or
            (not cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver ==
             'neutron.agent.firewall.NoopFirewallDriver' or
              cfg.CONF.SECURITYGROUP.firewall_driver is None)
             ))

def is_firewall_enabled():
    if not _is_valid_driver_combination():
        LOG.warn(_LW("Driver configuration doesn't match with "
                     "enable_security_group"))

    return cfg.CONF.SECURITYGROUP.enable_security_group

def _disable_extension(extension, aliases):
    if extension in aliases:
        aliases.remove(extension)

def disable_security_group_extension_by_config(aliases):
    if not is_firewall_enabled():
        LOG.info(_LI('Disabled security-group extension.'))
        _disable_extension('security-group', aliases)
        LOG.info(_LI('Disabled allowed-address-pairs extension.'))
        _disable_extension('allowed-address-pairs', aliases)

因为/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的enable_security_group参数设置如下。

enable_security_group = True

所以is_firewall_enabled函数返回true,所以disable_security_group_extension_by_config函数不会将’security-group’ alias移除。这里需要注意的。_is_valid_driver_combination函数用于check security group配置参数是否正确。其匹配如下。

enable_security_group

firewall_driver

True

firewall_driver !=  ‘neutron.agent.firewall.NoopFirewallDriver

False

firewall_driver == ‘neutron.agent.firewall.NoopFirewallDriver

or firewall_driver is None

’vlan-transparent’ alias的remove比较简单。

#/neutron/extensions/vlantransparent.py
def disable_extension_by_config(aliases):
    if not cfg.CONF.vlan_transparent:
        if 'vlan-transparent' in aliases:
            aliases.remove('vlan-transparent')
        LOG.info(_LI('Disabled vlantransparent extension.'))

vlan_transparent 为neutron.conf配置文件中的参数。如下

vlan_transparent = False

所以’vlan-transparent’alias被remove了。

最终supported_extension_aliases函数返回的alias为12个:

[‘provider‘, ‘external-net‘, ‘binding‘, ‘quotas‘, ‘security-group‘, ‘agent‘, ‘dhcp_agent_scheduler‘, ‘multi-provider‘, ‘allowed-address-pairs‘, ‘extra_dhcp_opt‘, ‘subnet_allocation‘, ‘net-mtu‘]

这里没有EXT_TO_SERVICE_MAPPING相匹配的alias。因此执行完_load_services_from_core_plugin函数后,service plugin目前还是只有core plugin被加载其中。

继续回到/neutron/manager.py:NeutronManager的_load_service_plugins函数。其内部在执行完_load_services_from_core_plugin函数后,将根据neutron.conf配置文件中的service_plugins参数去加载和创建service plugin对象。

service_plugins =neutron.services.l3_router.l3_router_plugin.L3RouterPlugin

所以最终的self.service_plugins变量的值为:

{‘L3_ROUTER_NAT‘: <neutron.services.l3_router.l3_router_plugin.L3RouterPlugin object at 0x42038d0>, ‘CORE‘: <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x360d910>}

其中L3_ROUTER_NAT由get_plugin_type获取的。

#/neutron/plugins/common/constants.py
L3_ROUTER_NAT = "L3_ROUTER_NAT"

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def get_plugin_type(self):
        return constants.L3_ROUTER_NAT

下面我们分析serviceplugin中的L3RouterPlugin对象的创建。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
                     extraroute_db.ExtraRoute_db_mixin,
                     l3_hamode_db.L3_HA_NAT_db_mixin,
                     l3_gwmode_db.L3_NAT_db_mixin,
                     l3_dvrscheduler_db.L3_DVRsch_db_mixin,
                     l3_hascheduler_db.L3_HA_scheduler_db_mixin):

    """Implementation of the Neutron L3 Router Service Plugin.

    This class implements a L3 service plugin that provides
    router and floatingip resources and manages associated
    request/response.
    All DB related work is implemented in classes
    l3_db.L3_NAT_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin,
    l3_dvr_db.L3_NAT_with_dvr_db_mixin, and extraroute_db.ExtraRoute_db_mixin.
    """
    supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
                                   "extraroute", "l3_agent_scheduler",
                                   "l3-ha"]

    def __init__(self):
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(L3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()

首先,创建RPC-client。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def setup_rpc(self):
        # RPC support
        self.topic = topics.L3PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.agent_notifiers.update(
            {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()

其中,

#/neutron/common/topics.py
L3PLUGIN = 'q-l3-plugin'

其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = {
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    }

self.agent_notifiers中的AGENT_TYPE_L3所对应的value值为L3AgentNotifyAPI对象,如下。

#/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py:L3AgentNotifyAPI
class L3AgentNotifyAPI(object):
    """API for plugin to notify L3 agent."""

    def __init__(self, topic=topics.L3_AGENT):
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

L3AgentNotifyAPI对象创建了topic为L3_AGENT = ‘l3_agent‘的RPC-client。而topic为L3_AGENT =‘l3_agent‘的RPC-server是由neutron-l3-agent服务启动时创建的。

#/neutron/agent/l3_agent.py
def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
    register_opts(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)
service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

同时setup_rpc也创建了一个topic为L3PLUGIN = ‘q-l3-plugin‘的RPC-server。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.setup_rpc
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()

其endpoints为/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback对象。其中,/neutron/agent/l3/agent.py:L3PluginApi将创建topic为为L3PLUGIN = ‘q-l3-plugin‘的RPC-client(neutron-l3-agent服务启动时创建的)去调用/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback类中的函数。

#/neutron/agent/l3/agent.py:L3PluginApi
class L3PluginApi(object):
    """Agent side of the l3 agent RPC API.

    API version history:
        1.0 - Initial version.
        1.1 - Floating IP operational status updates
        1.2 - DVR support: new L3 plugin methods added.
              - get_ports_by_subnet
              - get_agent_gateway_port
              Needed by the agent when operating in DVR/DVR_SNAT mode
        1.3 - Get the list of activated services
        1.4 - Added L3 HA update_router_state. This method was reworked in
              to update_ha_routers_states
        1.5 - Added update_ha_routers_states

    """

    def __init__(self, topic, host):
        self.host = host
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/agent/l3/agent.py:L3NATAgent
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
                 ha.AgentMixin,
                 dvr.AgentMixin,
                 manager.Manager):
    """Manager for L3NatAgent

        API version history:
        1.0 initial Version
        1.1 changed the type of the routers parameter
            to the routers_updated method.
            It was previously a list of routers in dict format.
            It is now a list of router IDs only.
            Per rpc versioning rules,  it is backwards compatible.
        1.2 - DVR support: new L3 agent methods added.
              - add_arp_entry
              - del_arp_entry
              Needed by the L3 service when dealing with DVR
    """
    target = oslo_messaging.Target(version='1.2')

    def __init__(self, host, conf=None):
        if conf:
            self.conf = conf
        else:
            self.conf = cfg.CONF
        self.router_info = {}

        self._check_config_params()

        self.process_monitor = external_process.ProcessMonitor(
            config=self.conf,
            resource_type='router')

              ... ... ...

        self.context = n_context.get_admin_context_without_session()
        self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
        self.fullsync = True
        ... ... ...

        self._queue = queue.RouterProcessingQueue()
        super(L3NATAgent, self).__init__(conf=self.conf)

        self.target_ex_net_id = None
        self.use_ipv6 = ipv6_utils.is_enabled()

        if self.conf.enable_metadata_proxy:
            self.metadata_driver = metadata_driver.MetadataDriver(self)

#/neutron/agent/l3/agent.py:L3NATAgentWithStateReport
class L3NATAgentWithStateReport(L3NATAgent):

    def __init__(self, host, conf=None):
        self.use_call = True
        super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        self.agent_state = {
            'binary': 'neutron-l3-agent',
            'host': host,
            'topic': topics.L3_AGENT,
            'configurations': {
                'agent_mode': self.conf.agent_mode,
                'use_namespaces': self.conf.use_namespaces,
                'router_id': self.conf.router_id,
                'handle_internal_only_routers':
                self.conf.handle_internal_only_routers,
                'external_network_bridge': self.conf.external_network_bridge,
                'gateway_external_network_id':
                self.conf.gateway_external_network_id,
                'interface_driver': self.conf.interface_driver},
            'start_flag': True,
            'agent_type': l3_constants.AGENT_TYPE_L3}
        report_interval = self.conf.AGENT.report_interval
        if report_interval:
            self.heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            self.heartbeat.start(interval=report_interval)

在neutron-l3-agent服务启动时,会创建L3NATAgentWithStateReport对象,该对象会初始化父类L3NATAgent类,L3NATAgent类将构建L3PluginApi对象(self.plugin_rpc= L3PluginApi(topics.L3PLUGIN, host)),从而L3PluginApi类中创建topic为为L3PLUGIN =‘q-l3-plugin‘的RPC-client。

在这里总结一下service plugin的RPC相关操作。

RPC-client

RPC-server

Class

service

endpoints

topic

service

L3AgentNotifyAPI

neutron-server

(service plugin)

L3NATAgentWithStateReport

(inherit from L3NATAgent)

l3_agent

(topics.L3_AGENT)

neutron-l3-agent

L3PluginApi

neutron-l3-agent

L3RpcCallback

q-l3-plugin

(topics.L3PLUGIN)

neutron-server

至此,L3RouterPlugin的__init__中setup_rpc函数分析完成,创建的RPC相关信息如上表所示。在L3RouterPlugin的__init__函数中还剩下以下重要的知识点。即Neutron Callback System。不过在L3RouterPlugin的__init__函数中只是注册了一些函数,在合适的地方将调用registry.notify函数进行执行注册的函数。如下

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.__init__
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()

这里,我将利用l3_db.subscribe()举例说明。

#/ neutron/db/l3_db.py
def subscribe():
    registry.subscribe(
        _prevent_l3_port_delete_callback, resources.PORT, events.BEFORE_DELETE)
    registry.subscribe(
        _notify_routers_callback, resources.PORT, events.AFTER_DELETE)

# NOTE(armax): multiple l3 service plugins (potentially out of tree) inherit
# from l3_db and may need the callbacks to be processed. Having an implicit
# subscription (through the module import) preserves the existing behavior,
# and at the same time it avoids fixing it manually in each and every l3 plugin
# out there. That said, The subscription is also made explicit in the
# reference l3 plugin. The subscription operation is idempotent so there is no
# harm in registering the same callback multiple times.
subscribe()

首先,我们看看如何subscribe的?

#/neutron/callbacks/registry.py
def _get_callback_manager():
    global CALLBACK_MANAGER
    if CALLBACK_MANAGER is None:
        CALLBACK_MANAGER = manager.CallbacksManager()
    return CALLBACK_MANAGER

#/neutron/callbacks/registry.py
def subscribe(callback, resource, event):
    _get_callback_manager().subscribe(callback, resource, event)

#/neutron/callbacks/manager.py:CallbacksManager
    def subscribe(self, callback, resource, event):
        """Subscribe callback for a resource event.

        The same callback may register for more than one event.

        :param callback: the callback. It must raise or return a boolean.
        :param resource: the resource. It must be a valid resource.
        :param event: the event. It must be a valid event.
        """
        LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s",
                  {'callback': callback, 'resource': resource, 'event': event})
        if resource not in resources.VALID:
            raise exceptions.Invalid(element='resource', value=resource)
        if event not in events.VALID:
            raise exceptions.Invalid(element='event', value=event)

        callback_id = _get_id(callback)
        self._callbacks[resource][event][callback_id] = weakref.proxy(callback)
        # We keep a copy of callbacks to speed the unsubscribe operation.
        if callback_id not in self._index:
            self._index[callback_id] = collections.defaultdict(set)
        self._index[callback_id][resource].add(event)

subscribe函数根据resource,event和callback_id 3个属性来标示callback函数。且将subscribe的callback函数保存到self._callbacks字典中。其中resource和event定义在如下文件中。

#/neutron/callbacks/resources.py
PORT = 'port'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
ROUTER_INTERFACE = 'router_interface'
SECURITY_GROUP = 'security_group'
SECURITY_GROUP_RULE = 'security_group_rule'

VALID = (
    PORT,
    ROUTER,
    ROUTER_GATEWAY,
    ROUTER_INTERFACE,
    SECURITY_GROUP,
    SECURITY_GROUP_RULE,
)

#/neutron/callbacks/events.py
BEFORE_CREATE = 'before_create'
BEFORE_READ = 'before_read'
BEFORE_UPDATE = 'before_update'
BEFORE_DELETE = 'before_delete'

AFTER_CREATE = 'after_create'
AFTER_READ = 'after_read'
AFTER_UPDATE = 'after_update'
AFTER_DELETE = 'after_delete'

ABORT_CREATE = 'abort_create'
ABORT_READ = 'abort_read'
ABORT_UPDATE = 'abort_update'
ABORT_DELETE = 'abort_delete'

ABORT = 'abort_'
BEFORE = 'before_'

VALID = (
    BEFORE_CREATE,
    BEFORE_READ,
    BEFORE_UPDATE,
    BEFORE_DELETE,
    AFTER_CREATE,
    AFTER_READ,
    AFTER_UPDATE,
    AFTER_DELETE,
    ABORT_CREATE,
    ABORT_READ,
    ABORT_UPDATE,
    ABORT_DELETE,
)

当我们需要调用subscribe的callback函数时,将registry.notify函数。如下

#/neutron/callbacks/registry.py
def notify(resource, event, trigger, **kwargs):
    _get_callback_manager().notify(resource, event, trigger, **kwargs)

#/neutron/callbacks/manager.py:CallbacksManager
    def notify(self, resource, event, trigger, **kwargs):
        """Notify all subscribed callback(s).

        Dispatch the resource's event to the subscribed callbacks.

        :param resource: the resource.
        :param event: the event.
        :param trigger: the trigger. A reference to the sender of the event.
        """
        errors = self._notify_loop(resource, event, trigger, **kwargs)
        if errors and event.startswith(events.BEFORE):
            abort_event = event.replace(
                events.BEFORE, events.ABORT)
            self._notify_loop(resource, abort_event, trigger)
            raise exceptions.CallbackFailure(errors=errors)

#/neutron/callbacks/manager.py:CallbacksManager
    def _notify_loop(self, resource, event, trigger, **kwargs):
        """The notification loop."""
        LOG.info(_LI("Notify callbacks for %(resource)s, %(event)s"),
                 {'resource': resource, 'event': event})

        errors = []
        # TODO(armax): consider using a GreenPile
        for callback_id, callback in self._callbacks[resource][event].items():
            try:
                LOG.info(_LI("Calling callback %s"), callback_id)
                callback(resource, event, trigger, **kwargs)
            except Exception as e:
                LOG.exception(_LE("Error during notification for "
                                  "%(callback)s %(resource)s, %(event)s"),
                              {'callback': callback_id,
                               'resource': resource,
                               'event': event})
                errors.append(exceptions.NotificationError(callback_id, e))
        return errors

最终在/neutron/callbacks/manager.py:CallbacksManager中的_notify_loop函数执行该callback函数。

下面,我将利用官网的例子进行介绍。

from neutron.callbacks import events

from neutron.callbacks import resources

from neutron.callbacks import registry

 

def callback1(resource, event, trigger, **kwargs):

    print ‘Callback1 called by trigger: ‘, trigger

    print ‘kwargs: ‘, kwargs

 

def callback2(resource, event, trigger, **kwargs):

    print ‘Callback2 called by trigger: ‘, trigger

    print ‘kwargs: ‘, kwargs

 

# B and C express interest with I

registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE)

registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE)

print ‘Subscribed‘

 

# A notifies

def do_notify():

    kwargs = {‘foo‘: ‘bar‘}

    registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)

 

print ‘Notifying...‘

do_notify()

执行上面的代码,其输出为:

Subscribed

Notifying...

Callback1 called by trigger:  <function do_notify at 0x1e86578>

kwargs:  {‘foo‘: ‘bar‘}

Callback2 called by trigger:  <function do_notify at 0x1e86578>

kwargs:  {‘foo‘: ‘bar‘}

因为subscribecallback1和callback2时,都是利用相同的resource(resources.ROUTER)和相同的event(events.BEFORE_CREATE)去subscribe的。所以最终在调用_notify_loop函数时,利用for循环,将执行callback1和callback2函数。

至此,core plugin和service plugin的创建代码流程分析完成。总结一下。

core plugin的创建涉及到type_manager管理的type driver(如vlan,flat)的创建, mechanism_manager管理的mechanism driver(如linuxbridge, openvswitch)的创建以及extension_manager管理的extension driver的创建(不过我的OpenStack并没使用extension driver)。还涉及到RPC-client的创建。

service plugin的创建涉及RPC-client和RPC-server的创建 以及使用Neutron Callback System 注册了一些callback函数。

这里总结一下coreresource(core API)与extension resource(extension API)的关系。

类似于nova对API资源的管理方式,neutron也将基于各种网络抽象得到的API资源分为coreresource(core API)与extension resource(extension API),core API只对应L2层的network/subnet/port/subnetpool(Kilo版本添加进来的)四种资源,其余的各层抽象都涵盖在extension API的范围。

Extension API有两种方式扩展现有的资源,一种是为network/subnet/port/subnetpool增加属性,比如port binding扩展,还有一种就是增加一些新的资源,比如VPNaaS等。其结构如下

技术分享

由于neutron-server涉及的代码流程较多,对于extension resource和RPC创建在下一篇文章进行分析。

PS:由于《nova boot代码流程分析(三):nova与neutron的交互》没有分析完成,所以待neutron-server分析完成后,再分析nova与neutron的交互的剩下内容。

neutron-server的启动流程(一)

标签:

原文地址:http://blog.csdn.net/gj19890923/article/details/51333899

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!