标签:
neutron-server的启动包括RPC-server的创建,RPC-client的创建,WSGI server的创建,因此neutron-server不单单起到与其他组件中的api的功能。本文将RPC相关创建和WSGI server的创建两方面进行代码流程的分析。
查看setup.cfg文件找到neutron-server的代码入口。
neutron-server = neutron.cmd.eventlet.server:main |
#/neutron/cmd/eventlet/server/__init__.py def main(): server.main() #/neutron/server/__init__.py def main(): # the configuration will be read into the cfg.CONF global data structure config.init(sys.argv[1:]) if not cfg.CONF.config_file: sys.exit(_("ERROR: Unable to find configuration file via the default" " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" " the '--config-file' option!")) try: pool = eventlet.GreenPool() neutron_api = service.serve_wsgi(service.NeutronApiService) api_thread = pool.spawn(neutron_api.wait) try: neutron_rpc = service.serve_rpc() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) else: rpc_thread = pool.spawn(neutron_rpc.wait) # api and rpc should die together. When one dies, kill the other. rpc_thread.link(lambda gt: api_thread.kill()) api_thread.link(lambda gt: rpc_thread.kill()) pool.waitall() except KeyboardInterrupt: pass except RuntimeError as e: sys.exit(_("ERROR: %s") % e)
neutron_api = service.serve_wsgi(service.NeutronApiService) |
#/neutron/service.py def serve_wsgi(cls): try: service = cls.create() service.start() except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log ' 'for details.')) return service #/neutron/service.py:NeutronApiService class NeutronApiService(WsgiService): """Class for neutron-api service.""" @classmethod def create(cls, app_name='neutron'): # Setup logging early, supplying both the CLI options and the # configuration mapping from the config file # We only update the conf dict for the verbose and debug # flags. Everything else must be set up in the conf file... # Log the options used when starting if we're in debug mode... config.setup_logging() # Dump the initial option values cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) service = cls(app_name) return service
serve_wsgi函数将传入的NeutronApiService对象进行创建,并执行该对象的start函数,创建WSGI sever。
#/neutron/service.py:WsgiService def start(self): self.wsgi_app = _run_wsgi(self.app_name) #/neutron/service.py def _run_wsgi(app_name): app = config.load_paste_app(app_name) if not app: LOG.error(_LE('No known API applications configured.')) return server = wsgi.Server("Neutron") server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, workers=cfg.CONF.api_workers) # Dump all option values here after all options are parsed cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"), {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port}) return server
其中,
app = config.load_paste_app(app_name) |
为从api-paste.ini文件加载composite为neutron的相关信息。首先创建/neutron/wsgi.py:Server对象,执行Server对象的start函数,将创建WSGI server。
#/neutron/wsgi.py:Server def start(self, application, port, host='0.0.0.0', workers=0): """Run a WSGI server with the given application.""" self._host = host self._port = port backlog = CONF.backlog self._socket = self._get_socket(self._host, self._port, backlog=backlog) self._launch(application, workers) def _launch(self, application, workers=0): service = WorkerService(self, application) if workers < 1: # The API service should run in the current process. self._server = service service.start() systemd.notify_once() else: # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. if CONF.database.connection: api.get_engine().pool.dispose() # The API service runs in a number of child processes. # Minimize the cost of checking for child exit by extending the # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(wait_interval=1.0) self._server.launch_service(service, workers=workers)
其中,start函数中的host即为neutron.conf配置文件中bind_host参数,一般设置为管理网络IP,port为neutron.conf文件中的bind_port参数,一般neutron-server的bind_port参数设置为9696。首先在start函数调用self._get_socket函数创建一个socket去监听本机的9696端口。以下是我的OpenStack环境的neutron-server进程。
[root@jun neutron]# netstat -tnulp | grep 9696 tcp 0 0 192.168.118.1:9696 0.0.0.0:* LISTEN 35298/python2 |
然后在_lauch函数中创建WSGI server进程(即创建子进程用于处理neutronclient发来的HTTP请求)。这里WSGI server进程的个数根据neutron.conf配置文件中的api_workers进行指定,一般为系统cpu的个数。
具体的创建WSGI server进程的代码分析可以参看《 keystone WSGI流程》文章。下面分析最终根据api-paste.ini文件加载的可调用的resources。api-paste.ini文件内容如下。
[composite:neutron] use = egg:Paste#urlmap /: neutronversions /v2.0: neutronapi_v2_0
[composite:neutronapi_v2_0] use = call:neutron.auth:pipeline_factory noauth = request_id catch_errors extensions neutronapiapp_v2_0 keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0
[filter:request_id] paste.filter_factory = oslo.middleware:RequestId.factory
[filter:catch_errors] paste.filter_factory = oslo.middleware:CatchErrors.factory
[filter:keystonecontext] paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory
[filter:authtoken] paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:extensions] paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory
[app:neutronversions] paste.app_factory = neutron.api.versions:Versions.factory
[app:neutronapiapp_v2_0] paste.app_factory = neutron.api.v2.router:APIRouter.factory |
composite为neutron时,有两个分支,一个是返回neutronversions的分支,一个是到composite为neutronapi_v2_0去调用相应的资源。我们发送HTTP请求到neutron,一般会调用/v2.0上的资源。比如create_port, update_port等等。查看composite为neutronapi_v2_0的部分,不管是noauth还是keystone,最终都将到app为neutronapiapp_v2_0的分支去加载资源。其中factory函数如下。
#/neutron/api/v2/router.py:APIRouter class APIRouter(wsgi.Router): @classmethod def factory(cls, global_config, **local_config): return cls(**local_config) def __init__(self, **local_config): mapper = routes_mapper.Mapper() plugin = manager.NeutronManager.get_plugin() ext_mgr = extensions.PluginAwareExtensionManager.get_instance() ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP) col_kwargs = dict(collection_actions=COLLECTION_ACTIONS, member_actions=MEMBER_ACTIONS) def _map_resource(collection, resource, params, parent=None): allow_bulk = cfg.CONF.allow_bulk allow_pagination = cfg.CONF.allow_pagination allow_sorting = cfg.CONF.allow_sorting controller = base.create_resource( collection, resource, plugin, params, allow_bulk=allow_bulk, parent=parent, allow_pagination=allow_pagination, allow_sorting=allow_sorting) path_prefix = None if parent: path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'], parent['member_name'], collection) mapper_kwargs = dict(controller=controller, requirements=REQUIREMENTS, path_prefix=path_prefix, **col_kwargs) return mapper.collection(collection, resource, **mapper_kwargs) mapper.connect('index', '/', controller=Index(RESOURCES)) for resource in RESOURCES: _map_resource(RESOURCES[resource], resource, attributes.RESOURCE_ATTRIBUTE_MAP.get( RESOURCES[resource], dict())) for resource in SUB_RESOURCES: _map_resource(SUB_RESOURCES[resource]['collection_name'], resource, attributes.RESOURCE_ATTRIBUTE_MAP.get( SUB_RESOURCES[resource]['collection_name'], dict()), SUB_RESOURCES[resource]['parent']) # Certain policy checks require that the extensions are loaded # and the RESOURCE_ATTRIBUTE_MAP populated before they can be # properly initialized. This can only be claimed with certainty # once this point in the code has been reached. In the event # that the policies have been initialized before this point, # calling reset will cause the next policy check to # re-initialize with all of the required data in place. policy.reset() super(APIRouter, self).__init__(mapper)
#/neutron/api/v2/router.py:APIRouter plugin = manager.NeutronManager.get_plugin()
#/neutron/manager.py:NeutronManager @classmethod def get_plugin(cls): # Return a weakref to minimize gc-preventing references. return weakref.proxy(cls.get_instance().plugin) #/neutron/manager.py:NeutronManager @classmethod def get_instance(cls): # double checked locking if not cls.has_instance(): cls._create_instance() return cls._instance #/neutron/manager.py:NeutronManager @classmethod @utils.synchronized("manager") def _create_instance(cls): if not cls.has_instance(): cls._instance = cls() #/neutron/manager.py:NeutronManager class NeutronManager(object): """Neutron's Manager class. Neutron's Manager class is responsible for parsing a config file and instantiating the correct plugin that concretely implements neutron_plugin_base class. The caller should make sure that NeutronManager is a singleton. """ _instance = None def __init__(self, options=None, config_file=None): # If no options have been provided, create an empty dict if not options: options = {} msg = validate_pre_plugin_load() if msg: LOG.critical(msg) raise Exception(msg) # NOTE(jkoelker) Testing for the subclass with the __subclasshook__ # breaks tach monitoring. It has been removed # intentionally to allow v2 plugins to be monitored # for performance metrics. plugin_provider = cfg.CONF.core_plugin LOG.info(_LI("Loading core plugin: %s"), plugin_provider) self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE, plugin_provider) msg = validate_post_plugin_load() if msg: LOG.critical(msg) raise Exception(msg) # core plugin as a part of plugin collection simplifies # checking extensions # TODO(enikanorov): make core plugin the same as # the rest of service plugins self.service_plugins = {constants.CORE: self.plugin} self._load_service_plugins()
get_plugin函数返回NeutronManager的plugin(core plugin)的弱引用,那么core plugin是什么对象呢?core plugin和service plugin在NeutronManager的__init__函数创建的。其中core plugin和service plugin分别根据neutron.conf配置文件中的core_plugin参数和service_plugins参数进行创建。
我的OpenStack环境的neutron.conf配置文件中的core_plugin参数值为:
core_plugin =neutron.plugins.ml2.plugin.Ml2Plugin |
其创建在_get_plugin_instance函数中执行。
#/neutron/manager.py:NeutronManager def _get_plugin_instance(self, namespace, plugin_provider): try: # Try to resolve plugin by name mgr = driver.DriverManager(namespace, plugin_provider) plugin_class = mgr.driver except RuntimeError as e1: # fallback to class name try: plugin_class = importutils.import_class(plugin_provider) except ImportError as e2: LOG.exception(_LE("Error loading plugin by name, %s"), e1) LOG.exception(_LE("Error loading plugin by class, %s"), e2) raise ImportError(_("Plugin not found.")) return plugin_class()
利用stevedore模块创建core_plugin,即core_plugin为/neutron/plugins/ml2/plugin.py:Ml2Plugin对象。
#/neutron/plugins/ml2/plugin.py:Ml2Plugin def __init__(self): # First load drivers, then initialize DB, then initialize drivers self.type_manager = managers.TypeManager() self.extension_manager = managers.ExtensionManager() self.mechanism_manager = managers.MechanismManager() super(Ml2Plugin, self).__init__() self.type_manager.initialize() self.extension_manager.initialize() self.mechanism_manager.initialize() self._setup_rpc() # REVISIT(rkukura): Use stevedore for these? self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) self.start_periodic_dhcp_agent_status_check() LOG.info(_LI("Modular L2 Plugin initialization complete"))
由于type_manager,extension_manager和mechanism_manager的创建都类似,所以这里我们主要分析type_manager代码流程,其余的简要说明。
#/neutron/plugins/ml2/managers.py:TypeManager class TypeManager(stevedore.named.NamedExtensionManager): """Manage network segment types using drivers.""" def __init__(self): # Mapping from type name to DriverManager self.drivers = {} LOG.info(_LI("Configured type driver names: %s"), cfg.CONF.ml2.type_drivers) super(TypeManager, self).__init__('neutron.ml2.type_drivers', cfg.CONF.ml2.type_drivers, invoke_on_load=True) LOG.info(_LI("Loaded type driver names: %s"), self.names()) self._register_types() self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)
这里,TypeManager对象中的drivers即为type driver对象。drivers根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的type_drivers参数去构造。在我的OpenStack环境中type_drivers参数值如下。
type_drivers = vlan,flat |
两个driver对象的创建在TypeManager类的父类中完成。且创建两个对象被/stevedore/extension.py:Extension包裹。
#/neutron/plugins/ml2/managers.py:TypeManager def _register_types(self): for ext in self: network_type = ext.obj.get_type() if network_type in self.drivers: LOG.error(_LE("Type driver '%(new_driver)s' ignored because" " type driver '%(old_driver)s' is already" " registered for type '%(type)s'"), {'new_driver': ext.name, 'old_driver': self.drivers[network_type].name, 'type': network_type}) else: self.drivers[network_type] = ext LOG.info(_LI("Registered types: %s"), self.drivers.keys())
这里,将创建完成的typedriver对象register到self.drivers字典中,如下。
self.drivers: {‘flat‘: <stevedore.extension.Extension object at 0x428cad0>, ‘vlan‘: <stevedore.extension.Extension object at 0x4299150>} |
如果调用实际的flat或vlan对于的type driver的函数,需要访问/stevedore/extension.py:Extension对象的obj成员变量,obj成员变量即为实际的type driver的对象。如下
‘flat’: <neutron.plugins.ml2.drivers.type_flat.FlatTypeDriver object at 0x46cfb50> ‘vlan’: <neutron.plugins.ml2.drivers.type_vlan.VlanTypeDriver object at 0x46a9bd0> |
在register typedriver后,check tenant的network type是否在我们所register的type driver中,如果没有,则raise异常。其中tenant的network type是由/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的tenant_network_types参数设置。我的OpenStack环境的tenant_network_types参数配置如下。
tenant_network_types = vlan |
#/neutron/plugins/ml2/managers.py:TypeManager def _check_tenant_network_types(self, types): self.tenant_network_types = [] for network_type in types: if network_type in self.drivers: self.tenant_network_types.append(network_type) else: LOG.error(_LE("No type driver for tenant network_type: %s. " "Service terminated!"), network_type) raise SystemExit(1) LOG.info(_LI("Tenant network_types: %s"), self.tenant_network_types)
所以执行_check_tenant_network_types函数时,tenant network type校验成功。
mechanism_manager的创建也是类似的,它管理的mechanism driver。其根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的mechanism_drivers参数去构造mechanism driver对象。我的OpenStack环境mechanism_drivers配置参数如下。
mechanism_drivers =linuxbridge |
构建的mechanismdriver对象为:
<neutron.plugins.ml2.drivers.mech_linuxbridge.LinuxbridgeMechanismDriver object at 0x3f42b90> |
extension_manager的创建也类似。
在type_manager,extension_manager和mechanism_manager创建完成后,执行initialize函数对所创建的driver进行初始化。比如type_manager的initialize函数如下。
#/neutron/plugins/ml2/managers.py:TypeManager def initialize(self): for network_type, driver in self.drivers.iteritems(): LOG.info(_LI("Initializing driver for type '%s'"), network_type) driver.obj.initialize()
从上可以看出,最终会调用type_manager所管理的driver的initialize函数。
下面分析创建rpc-client的代码流程。即执行以下代码。
self._setup_rpc() |
#/neutron/plugins/ml2/managers.py:TypeManager def _setup_rpc(self): self.notifier = rpc.AgentNotifierApi(topics.AGENT) self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() )
这里topics.AGENT=’q-agent-notifier’。
#/neutron/plugins/ml2/rpc.py:AgentNotifierApi class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. API version history: 1.0 - Initial version. 1.1 - Added get_active_networks_info, create_dhcp_port, update_dhcp_port, and removed get_dhcp_port methods. """ def __init__(self, topic): self.topic = topic self.topic_network_delete = topics.get_topic_name(topic, topics.NETWORK, topics.DELETE) self.topic_port_update = topics.get_topic_name(topic, topics.PORT, topics.UPDATE) self.topic_port_delete = topics.get_topic_name(topic, topics.PORT, topics.DELETE) target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) #/neutron/common/topics.py def get_topic_name(prefix, table, operation, host=None): """Create a topic name. The topic name needs to be synced between the agent and the plugin. The plugin will send a fanout message to all of the listening agents so that the agents in turn can perform their updates accordingly. :param prefix: Common prefix for the plugin/agent message queues. :param table: The table in question (NETWORK, SUBNET, PORT). :param operation: The operation that invokes notification (CREATE, DELETE, UPDATE) :param host: Add host to the topic :returns: The topic name. """ if host: return '%s-%s-%s.%s' % (prefix, table, operation, host) return '%s-%s-%s' % (prefix, table, operation)
其中/neutron/common/topics.py文件中定义了一些常量如下。
#/neutron/common/topics.py NETWORK = ‘network‘ SUBNET = ‘subnet‘ PORT = ‘port‘ SECURITY_GROUP = ‘security_group‘ L2POPULATION = ‘l2population‘ DVR = ‘dvr‘
CREATE = ‘create‘ DELETE = ‘delete‘ UPDATE = ‘update‘
AGENT = ‘q-agent-notifier‘ PLUGIN = ‘q-plugin‘ L3PLUGIN = ‘q-l3-plugin‘ DHCP = ‘q-dhcp-notifer‘ FIREWALL_PLUGIN = ‘q-firewall-plugin‘ METERING_PLUGIN = ‘q-metering-plugin‘ LOADBALANCER_PLUGIN = ‘n-lbaas-plugin‘
L3_AGENT = ‘l3_agent‘ DHCP_AGENT = ‘dhcp_agent‘ METERING_AGENT = ‘metering_agent‘ LOADBALANCER_AGENT = ‘n-lbaas_agent‘ |
所以AgentNotifierApi对象创建了通过AMQP到topic为’q-agent-notifier’的RPC-server的绑定。即AgentNotifierApi可远程调用执行topic为’q-agent-notifier’的RPC-server上的函数。不过这里,并没有纯粹的调用topic为’q-agent-notifier’的RPC-server函数。而是通过prepare函数修改topic构建新的Target来调用3种另外的topic的RPC-server上的函数。这3种topic为(根据get_topic_name函数构造出来的):
‘q-agent-notifier-network-delete’ ‘q-agent-notifier-port-update’ ‘q-agent-notifier-port-delete’ |
那么对应的这3个topic的RPC-server是哪个服务构建的呢?通过AgentNotifierApi类的解释可知,应该是neutron-openvswitch-agent服务构建的。如下
#/neutron/plugins/opensvswitch/agent/ovs_neutron_agent.py:OVSNeutronAgent def setup_rpc(self): self.agent_id = 'ovs-agent-%s' % cfg.CONF.host self.topic = topics.AGENT self.plugin_rpc = OVSPluginApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service self.endpoints = [self] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.PORT, topics.DELETE], [topics.NETWORK, topics.DELETE], [constants.TUNNEL, topics.UPDATE], [constants.TUNNEL, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE], [topics.DVR, topics.UPDATE]] if self.l2_pop: consumers.append([topics.L2POPULATION, topics.UPDATE, cfg.CONF.host]) self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers, start_listening=False) #/neutron/agent/rpc.py def create_consumers(endpoints, prefix, topic_details, start_listening=True): """Create agent RPC consumers. :param endpoints: The list of endpoints to process the incoming messages. :param prefix: Common prefix for the plugin/agent message queues. :param topic_details: A list of topics. Each topic has a name, an operation, and an optional host param keying the subscription to topic.host for plugin calls. :param start_listening: if True, it starts the processing loop :returns: A common Connection. """ connection = n_rpc.create_connection(new=True) for details in topic_details: topic, operation, node_name = itertools.islice( itertools.chain(details, [None]), 3) topic_name = topics.get_topic_name(prefix, topic, operation) connection.create_consumer(topic_name, endpoints, fanout=True) if node_name: node_topic_name = '%s.%s' % (topic_name, node_name) connection.create_consumer(node_topic_name, endpoints, fanout=False) if start_listening: connection.consume_in_threads() return connection
从上面的代码可以看出,neutron-openvswitch-agent构建的RPC-server将会最多有8个topic。即
‘q-agent-notifier-port-update’ ‘q-agent-notifier-port-delete’ ‘q-agent-notifier-network-delete’ ‘q-agent-notifier-tunnel-update’ ‘q-agent-notifier-tunnel-delete’ ‘q-agent-notifier-security_group-update’ ‘q-agent-notifier-dvr-update’ ‘q-agent-notifier-l2population–update.jun2’ |
从上面8个topic可以看出,包括AgentNotifierApi类所构建RPC-client所需的topic。
不过,我的OpenStack环境采用的linuxbridge的mechanism driver,而在neutron-linuxbridge-agent服务启动时,创建RPC-server时,没有topic为’ q-agent-notifier-port-delete’的Target,所以采用的linuxbridge的mechanism driver时,AgentNotifierApi对象不能调用port_delete函数。如下
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC def setup_rpc(self, physical_interfaces): if physical_interfaces: mac = utils.get_interface_mac(physical_interfaces[0]) else: devices = ip_lib.IPWrapper().get_devices(True) if devices: mac = utils.get_interface_mac(devices[0].name) else: LOG.error(_LE("Unable to obtain MAC address for unique ID. " "Agent terminated!")) exit(1) self.agent_id = '%s%s' % ('lb', (mac.replace(":", ""))) LOG.info(_LI("RPC agent_id: %s"), self.agent_id) self.topic = topics.AGENT self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) # RPC network init # Handle updates from service self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self, self.sg_agent)] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE]] if cfg.CONF.VXLAN.l2_population: consumers.append([topics.L2POPULATION, topics.UPDATE, cfg.CONF.host]) self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) report_interval = cfg.CONF.AGENT.report_interval if report_interval: heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) heartbeat.start(interval=report_interval)
下面分析另一个跟dhcp相关的RPC-client的创建。
self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() ) |
其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。
#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" # agent notifiers to handle agent update operations; # should be updated by plugins; agent_notifiers = { constants.AGENT_TYPE_DHCP: None, constants.AGENT_TYPE_L3: None, constants.AGENT_TYPE_LOADBALANCER: None, }
DhcpAgentNotifyAPI对象的创建如下。
#/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py:DhcpAgentNotifyAPI class DhcpAgentNotifyAPI(object): """API for plugin to notify DHCP agent. This class implements the client side of an rpc interface. The server side is neutron.agent.dhcp_agent.DhcpAgent. For more information about changing rpc interfaces, please see doc/source/devref/rpc_api.rst. """ # It seems dhcp agent does not support bulk operation VALID_RESOURCES = ['network', 'subnet', 'port'] VALID_METHOD_NAMES = ['network.create.end', 'network.update.end', 'network.delete.end', 'subnet.create.end', 'subnet.update.end', 'subnet.delete.end', 'port.create.end', 'port.update.end', 'port.delete.end'] def __init__(self, topic=topics.DHCP_AGENT, plugin=None): self._plugin = plugin target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target)
其中RPC-client的topic的值为’dhcp_agent’。而对应的RPC-server是在neutron-dhcp-agent服务启动时创建的。
#neutron/agent/dhcp_agent.py def main(): register_options() common_config.init(sys.argv[1:]) config.setup_logging() server = neutron_service.Service.create( binary='neutron-dhcp-agent', topic=topics.DHCP_AGENT, report_interval=cfg.CONF.AGENT.report_interval, manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport') service.launch(server).wait() #neutron/service.py:Service def start(self): self.manager.init_host() super(Service, self).start() if self.report_interval: pulse = loopingcall.FixedIntervalLoopingCall(self.report_state) pulse.start(interval=self.report_interval, initial_delay=self.report_interval) self.timers.append(pulse) #neutron/common/rpc.py:Service def start(self): super(Service, self).start() self.conn = create_connection(new=True) LOG.debug("Creating Consumer connection for Service %s", self.topic) endpoints = [self.manager] self.conn.create_consumer(self.topic, endpoints) # Hook to allow the manager to do other initializations after # the rpc connection is created. if callable(getattr(self.manager, 'initialize_service_hook', None)): self.manager.initialize_service_hook(self) # Consume from all consumers in threads self.conn.consume_in_threads() if self.periodic_interval: if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: initial_delay = None periodic = loopingcall.FixedIntervalLoopingCall( self.periodic_tasks) periodic.start(interval=self.periodic_interval, initial_delay=initial_delay) self.timers.append(periodic) self.manager.after_start()
总结一下,上面的coreplugin中的RPC-client与RPC-server的对应关系。
RPC-client |
RPC-server |
|||
Class |
service |
endpoints |
topic |
service |
AgentNotifierApi |
neutron-server (core plugin) |
OVSNeutronAgent |
q-agent-notifier-xxx-yyy (topics.AGENT-xxx-yyy) |
neutron-openvswitch-agent |
LinuxBridgeRpcCallbacks |
q-agent-notifier-xxx-yyy (topics.AGENT-xxx-yyy) |
neutron-linuxbridge-agent |
||
DhcpAgentNotifyAPI |
DhcpAgentWithStateReport (inherit from DhcpAgent) |
dhcp_agent (topics.DHCP_AGENT) |
neutron-dhcp-agent |
其中,xxx表示资源(如port,network等),yyy表示操作(如update,delete等)。
至此_setup_rpc函数分析完成。
Ml2Plugin的__init__函数还剩下以下代码。
#neutron/plugins/ml2/plugin.py:Ml2Plugin.__init__ # REVISIT(rkukura): Use stevedore for these? self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) self.start_periodic_dhcp_agent_status_check() LOG.info(_LI("Modular L2 Plugin initialization complete"))
network_scheduler_driver是在neutron.conf配置文件中的参数,其默认值为:
network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler |
所以self.network_scheduler为/neutron/scheduler/dhcp_agent_scheduler.py:ChanceScheduler对象。
start_periodic_dhcp_agent_status_check函数是一个周期性检测函数。
目前把core plugin的代码流程分析完成。下面分析service plugin的代码流程。
#/neutron/manager.py:NeutronManager.__init__ # core plugin as a part of plugin collection simplifies # checking extensions # TODO(enikanorov): make core plugin the same as # the rest of service plugins self.service_plugins = {constants.CORE: self.plugin} self._load_service_plugins()
service plugin首先将core plugin载入到self.service_plugins中,然后执行self._load_service_plugins函数将自身的service plugin载入到self.service_plugins中。
#/neutron/manager.py:NeutronManager def _load_service_plugins(self): """Loads service plugins. Starts from the core plugin and checks if it supports advanced services then loads classes provided in configuration. """ # load services from the core plugin first self._load_services_from_core_plugin() plugin_providers = cfg.CONF.service_plugins LOG.debug("Loading service plugins: %s", plugin_providers) for provider in plugin_providers: if provider == '': continue LOG.info(_LI("Loading Plugin: %s"), provider) plugin_inst = self._get_plugin_instance('neutron.service_plugins', provider) # only one implementation of svc_type allowed # specifying more than one plugin # for the same type is a fatal exception if plugin_inst.get_plugin_type() in self.service_plugins: raise ValueError(_("Multiple plugins for service " "%s were configured") % plugin_inst.get_plugin_type()) self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst # search for possible agent notifiers declared in service plugin # (needed by agent management extension) if (hasattr(self.plugin, 'agent_notifiers') and hasattr(plugin_inst, 'agent_notifiers')): self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers) LOG.debug("Successfully loaded %(type)s plugin. " "Description: %(desc)s", {"type": plugin_inst.get_plugin_type(), "desc": plugin_inst.get_plugin_description()})
首先从core plugin中load service plugin。
#/neutron/manager.py:NeutronManager def _load_services_from_core_plugin(self): """Puts core plugin in service_plugins for supported services.""" LOG.debug("Loading services supported by the core plugin") # supported service types are derived from supported extensions for ext_alias in getattr(self.plugin, "supported_extension_aliases", []): if ext_alias in constants.EXT_TO_SERVICE_MAPPING: service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias] self.service_plugins[service_type] = self.plugin LOG.info(_LI("Service %s is supported by the core plugin"), service_type)
其中EXT_TO_SERVICE_MAPPING的信息如下。
#/neutron/plugins/common/constants.py # service type constants: CORE = "CORE" DUMMY = "DUMMY" LOADBALANCER = "LOADBALANCER" LOADBALANCERV2 = "LOADBALANCERV2" FIREWALL = "FIREWALL" VPN = "VPN" METERING = "METERING" L3_ROUTER_NAT = "L3_ROUTER_NAT" #maps extension alias to service type EXT_TO_SERVICE_MAPPING = { 'dummy': DUMMY, 'lbaas': LOADBALANCER, 'lbaasv2': LOADBALANCERV2, 'fwaas': FIREWALL, 'vpnaas': VPN, 'metering': METERING, 'router': L3_ROUTER_NAT }
即从coreplugin(Ml2Plugin对象)的supported_extension_aliases函数返回的alias查找是否有EXT_TO_SERVICE_MAPPING相匹配的alias,如果有,则将core plugin载入到service plugin中。
#/neutron/plugins/ml2/plugin.py:Ml2Plugin # List of supported extensions _supported_extension_aliases = ["provider", "external-net", "binding", "quotas", "security-group", "agent", "dhcp_agent_scheduler", "multi-provider", "allowed-address-pairs", "extra_dhcp_opt", "subnet_allocation", "net-mtu", "vlan-transparent"] @property def supported_extension_aliases(self): if not hasattr(self, '_aliases'): aliases = self._supported_extension_aliases[:] aliases += self.extension_manager.extension_aliases() sg_rpc.disable_security_group_extension_by_config(aliases) vlantransparent.disable_extension_by_config(aliases) self._aliases = aliases return self._aliases
supported_extension_aliases函数将_supported_extension_aliases列表的alias赋给self._aliases变量。不过_supported_extension_aliases列表有13个alias,在supported_extension_aliases函数会判断是否移除’security-group’和’vlan-transparent’这两个alias。拿security-group举例说明。
#/neutron/agent/securitygroups_rpc.py #This is backward compatibility check for Havana def _is_valid_driver_combination(): return ((cfg.CONF.SECURITYGROUP.enable_security_group and (cfg.CONF.SECURITYGROUP.firewall_driver and cfg.CONF.SECURITYGROUP.firewall_driver != 'neutron.agent.firewall.NoopFirewallDriver')) or (not cfg.CONF.SECURITYGROUP.enable_security_group and (cfg.CONF.SECURITYGROUP.firewall_driver == 'neutron.agent.firewall.NoopFirewallDriver' or cfg.CONF.SECURITYGROUP.firewall_driver is None) )) def is_firewall_enabled(): if not _is_valid_driver_combination(): LOG.warn(_LW("Driver configuration doesn't match with " "enable_security_group")) return cfg.CONF.SECURITYGROUP.enable_security_group def _disable_extension(extension, aliases): if extension in aliases: aliases.remove(extension) def disable_security_group_extension_by_config(aliases): if not is_firewall_enabled(): LOG.info(_LI('Disabled security-group extension.')) _disable_extension('security-group', aliases) LOG.info(_LI('Disabled allowed-address-pairs extension.')) _disable_extension('allowed-address-pairs', aliases)
因为/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的enable_security_group参数设置如下。
enable_security_group = True |
所以is_firewall_enabled函数返回true,所以disable_security_group_extension_by_config函数不会将’security-group’ alias移除。这里需要注意的。_is_valid_driver_combination函数用于check security group配置参数是否正确。其匹配如下。
enable_security_group |
firewall_driver |
True |
firewall_driver != ‘neutron.agent.firewall.NoopFirewallDriver |
False |
firewall_driver == ‘neutron.agent.firewall.NoopFirewallDriver |
or firewall_driver is None |
’vlan-transparent’ alias的remove比较简单。
#/neutron/extensions/vlantransparent.py def disable_extension_by_config(aliases): if not cfg.CONF.vlan_transparent: if 'vlan-transparent' in aliases: aliases.remove('vlan-transparent') LOG.info(_LI('Disabled vlantransparent extension.'))
vlan_transparent 为neutron.conf配置文件中的参数。如下
vlan_transparent = False |
所以’vlan-transparent’alias被remove了。
最终supported_extension_aliases函数返回的alias为12个:
[‘provider‘, ‘external-net‘, ‘binding‘, ‘quotas‘, ‘security-group‘, ‘agent‘, ‘dhcp_agent_scheduler‘, ‘multi-provider‘, ‘allowed-address-pairs‘, ‘extra_dhcp_opt‘, ‘subnet_allocation‘, ‘net-mtu‘] |
这里没有EXT_TO_SERVICE_MAPPING相匹配的alias。因此执行完_load_services_from_core_plugin函数后,service plugin目前还是只有core plugin被加载其中。
继续回到/neutron/manager.py:NeutronManager的_load_service_plugins函数。其内部在执行完_load_services_from_core_plugin函数后,将根据neutron.conf配置文件中的service_plugins参数去加载和创建service plugin对象。
service_plugins =neutron.services.l3_router.l3_router_plugin.L3RouterPlugin |
所以最终的self.service_plugins变量的值为:
{‘L3_ROUTER_NAT‘: <neutron.services.l3_router.l3_router_plugin.L3RouterPlugin object at 0x42038d0>, ‘CORE‘: <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x360d910>} |
其中L3_ROUTER_NAT由get_plugin_type获取的。
#/neutron/plugins/common/constants.py L3_ROUTER_NAT = "L3_ROUTER_NAT" #/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin def get_plugin_type(self): return constants.L3_ROUTER_NAT
下面我们分析serviceplugin中的L3RouterPlugin对象的创建。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin class L3RouterPlugin(common_db_mixin.CommonDbMixin, extraroute_db.ExtraRoute_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin, l3_dvrscheduler_db.L3_DVRsch_db_mixin, l3_hascheduler_db.L3_HA_scheduler_db_mixin): """Implementation of the Neutron L3 Router Service Plugin. This class implements a L3 service plugin that provides router and floatingip resources and manages associated request/response. All DB related work is implemented in classes l3_db.L3_NAT_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin, l3_dvr_db.L3_NAT_with_dvr_db_mixin, and extraroute_db.ExtraRoute_db_mixin. """ supported_extension_aliases = ["dvr", "router", "ext-gw-mode", "extraroute", "l3_agent_scheduler", "l3-ha"] def __init__(self): self.setup_rpc() self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe()
首先,创建RPC-client。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin def setup_rpc(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection(new=True) self.agent_notifiers.update( {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) self.conn.consume_in_threads()
其中,
#/neutron/common/topics.py L3PLUGIN = 'q-l3-plugin'
其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。
#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" # agent notifiers to handle agent update operations; # should be updated by plugins; agent_notifiers = { constants.AGENT_TYPE_DHCP: None, constants.AGENT_TYPE_L3: None, constants.AGENT_TYPE_LOADBALANCER: None, }
self.agent_notifiers中的AGENT_TYPE_L3所对应的value值为L3AgentNotifyAPI对象,如下。
#/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py:L3AgentNotifyAPI class L3AgentNotifyAPI(object): """API for plugin to notify L3 agent.""" def __init__(self, topic=topics.L3_AGENT): target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target)
L3AgentNotifyAPI对象创建了topic为L3_AGENT = ‘l3_agent‘的RPC-client。而topic为L3_AGENT =‘l3_agent‘的RPC-server是由neutron-l3-agent服务启动时创建的。
#/neutron/agent/l3_agent.py def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'): register_opts(cfg.CONF) common_config.init(sys.argv[1:]) config.setup_logging() server = neutron_service.Service.create( binary='neutron-l3-agent', topic=topics.L3_AGENT, report_interval=cfg.CONF.AGENT.report_interval, manager=manager) service.launch(server).wait() #neutron/service.py:Service def start(self): self.manager.init_host() super(Service, self).start() if self.report_interval: pulse = loopingcall.FixedIntervalLoopingCall(self.report_state) pulse.start(interval=self.report_interval, initial_delay=self.report_interval) self.timers.append(pulse) #neutron/common/rpc.py:Service def start(self): super(Service, self).start() self.conn = create_connection(new=True) LOG.debug("Creating Consumer connection for Service %s", self.topic) endpoints = [self.manager] self.conn.create_consumer(self.topic, endpoints) # Hook to allow the manager to do other initializations after # the rpc connection is created. if callable(getattr(self.manager, 'initialize_service_hook', None)): self.manager.initialize_service_hook(self) # Consume from all consumers in threads self.conn.consume_in_threads() if self.periodic_interval: if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: initial_delay = None periodic = loopingcall.FixedIntervalLoopingCall( self.periodic_tasks) periodic.start(interval=self.periodic_interval, initial_delay=initial_delay) self.timers.append(periodic) self.manager.after_start()
同时setup_rpc也创建了一个topic为L3PLUGIN = ‘q-l3-plugin‘的RPC-server。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.setup_rpc self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) self.conn.consume_in_threads()
其endpoints为/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback对象。其中,/neutron/agent/l3/agent.py:L3PluginApi将创建topic为为L3PLUGIN = ‘q-l3-plugin‘的RPC-client(在neutron-l3-agent服务启动时创建的)去调用/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback类中的函数。
#/neutron/agent/l3/agent.py:L3PluginApi class L3PluginApi(object): """Agent side of the l3 agent RPC API. API version history: 1.0 - Initial version. 1.1 - Floating IP operational status updates 1.2 - DVR support: new L3 plugin methods added. - get_ports_by_subnet - get_agent_gateway_port Needed by the agent when operating in DVR/DVR_SNAT mode 1.3 - Get the list of activated services 1.4 - Added L3 HA update_router_state. This method was reworked in to update_ha_routers_states 1.5 - Added update_ha_routers_states """ def __init__(self, topic, host): self.host = host target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) #/neutron/agent/l3/agent.py:L3NATAgent class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, ha.AgentMixin, dvr.AgentMixin, manager.Manager): """Manager for L3NatAgent API version history: 1.0 initial Version 1.1 changed the type of the routers parameter to the routers_updated method. It was previously a list of routers in dict format. It is now a list of router IDs only. Per rpc versioning rules, it is backwards compatible. 1.2 - DVR support: new L3 agent methods added. - add_arp_entry - del_arp_entry Needed by the L3 service when dealing with DVR """ target = oslo_messaging.Target(version='1.2') def __init__(self, host, conf=None): if conf: self.conf = conf else: self.conf = cfg.CONF self.router_info = {} self._check_config_params() self.process_monitor = external_process.ProcessMonitor( config=self.conf, resource_type='router') ... ... ... self.context = n_context.get_admin_context_without_session() self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host) self.fullsync = True ... ... ... self._queue = queue.RouterProcessingQueue() super(L3NATAgent, self).__init__(conf=self.conf) self.target_ex_net_id = None self.use_ipv6 = ipv6_utils.is_enabled() if self.conf.enable_metadata_proxy: self.metadata_driver = metadata_driver.MetadataDriver(self) #/neutron/agent/l3/agent.py:L3NATAgentWithStateReport class L3NATAgentWithStateReport(L3NATAgent): def __init__(self, host, conf=None): self.use_call = True super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) self.agent_state = { 'binary': 'neutron-l3-agent', 'host': host, 'topic': topics.L3_AGENT, 'configurations': { 'agent_mode': self.conf.agent_mode, 'use_namespaces': self.conf.use_namespaces, 'router_id': self.conf.router_id, 'handle_internal_only_routers': self.conf.handle_internal_only_routers, 'external_network_bridge': self.conf.external_network_bridge, 'gateway_external_network_id': self.conf.gateway_external_network_id, 'interface_driver': self.conf.interface_driver}, 'start_flag': True, 'agent_type': l3_constants.AGENT_TYPE_L3} report_interval = self.conf.AGENT.report_interval if report_interval: self.heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) self.heartbeat.start(interval=report_interval)
在neutron-l3-agent服务启动时,会创建L3NATAgentWithStateReport对象,该对象会初始化父类L3NATAgent类,L3NATAgent类将构建L3PluginApi对象(self.plugin_rpc= L3PluginApi(topics.L3PLUGIN, host)),从而L3PluginApi类中创建topic为为L3PLUGIN =‘q-l3-plugin‘的RPC-client。
在这里总结一下service plugin的RPC相关操作。
RPC-client |
RPC-server |
|||
Class |
service |
endpoints |
topic |
service |
L3AgentNotifyAPI |
neutron-server (service plugin) |
L3NATAgentWithStateReport (inherit from L3NATAgent) |
l3_agent (topics.L3_AGENT) |
neutron-l3-agent |
L3PluginApi |
neutron-l3-agent |
L3RpcCallback |
q-l3-plugin (topics.L3PLUGIN) |
neutron-server |
至此,L3RouterPlugin的__init__中setup_rpc函数分析完成,创建的RPC相关信息如上表所示。在L3RouterPlugin的__init__函数中还剩下以下重要的知识点。即Neutron Callback System。不过在L3RouterPlugin的__init__函数中只是注册了一些函数,在合适的地方将调用registry.notify函数进行执行注册的函数。如下
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.__init__ if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe()
这里,我将利用l3_db.subscribe()举例说明。
#/ neutron/db/l3_db.py def subscribe(): registry.subscribe( _prevent_l3_port_delete_callback, resources.PORT, events.BEFORE_DELETE) registry.subscribe( _notify_routers_callback, resources.PORT, events.AFTER_DELETE) # NOTE(armax): multiple l3 service plugins (potentially out of tree) inherit # from l3_db and may need the callbacks to be processed. Having an implicit # subscription (through the module import) preserves the existing behavior, # and at the same time it avoids fixing it manually in each and every l3 plugin # out there. That said, The subscription is also made explicit in the # reference l3 plugin. The subscription operation is idempotent so there is no # harm in registering the same callback multiple times. subscribe()
首先,我们看看如何subscribe的?
#/neutron/callbacks/registry.py def _get_callback_manager(): global CALLBACK_MANAGER if CALLBACK_MANAGER is None: CALLBACK_MANAGER = manager.CallbacksManager() return CALLBACK_MANAGER #/neutron/callbacks/registry.py def subscribe(callback, resource, event): _get_callback_manager().subscribe(callback, resource, event) #/neutron/callbacks/manager.py:CallbacksManager def subscribe(self, callback, resource, event): """Subscribe callback for a resource event. The same callback may register for more than one event. :param callback: the callback. It must raise or return a boolean. :param resource: the resource. It must be a valid resource. :param event: the event. It must be a valid event. """ LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s", {'callback': callback, 'resource': resource, 'event': event}) if resource not in resources.VALID: raise exceptions.Invalid(element='resource', value=resource) if event not in events.VALID: raise exceptions.Invalid(element='event', value=event) callback_id = _get_id(callback) self._callbacks[resource][event][callback_id] = weakref.proxy(callback) # We keep a copy of callbacks to speed the unsubscribe operation. if callback_id not in self._index: self._index[callback_id] = collections.defaultdict(set) self._index[callback_id][resource].add(event)
subscribe函数根据resource,event和callback_id 3个属性来标示callback函数。且将subscribe的callback函数保存到self._callbacks字典中。其中resource和event定义在如下文件中。
#/neutron/callbacks/resources.py PORT = 'port' ROUTER = 'router' ROUTER_GATEWAY = 'router_gateway' ROUTER_INTERFACE = 'router_interface' SECURITY_GROUP = 'security_group' SECURITY_GROUP_RULE = 'security_group_rule' VALID = ( PORT, ROUTER, ROUTER_GATEWAY, ROUTER_INTERFACE, SECURITY_GROUP, SECURITY_GROUP_RULE, ) #/neutron/callbacks/events.py BEFORE_CREATE = 'before_create' BEFORE_READ = 'before_read' BEFORE_UPDATE = 'before_update' BEFORE_DELETE = 'before_delete' AFTER_CREATE = 'after_create' AFTER_READ = 'after_read' AFTER_UPDATE = 'after_update' AFTER_DELETE = 'after_delete' ABORT_CREATE = 'abort_create' ABORT_READ = 'abort_read' ABORT_UPDATE = 'abort_update' ABORT_DELETE = 'abort_delete' ABORT = 'abort_' BEFORE = 'before_' VALID = ( BEFORE_CREATE, BEFORE_READ, BEFORE_UPDATE, BEFORE_DELETE, AFTER_CREATE, AFTER_READ, AFTER_UPDATE, AFTER_DELETE, ABORT_CREATE, ABORT_READ, ABORT_UPDATE, ABORT_DELETE, )
当我们需要调用subscribe的callback函数时,将registry.notify函数。如下
#/neutron/callbacks/registry.py def notify(resource, event, trigger, **kwargs): _get_callback_manager().notify(resource, event, trigger, **kwargs) #/neutron/callbacks/manager.py:CallbacksManager def notify(self, resource, event, trigger, **kwargs): """Notify all subscribed callback(s). Dispatch the resource's event to the subscribed callbacks. :param resource: the resource. :param event: the event. :param trigger: the trigger. A reference to the sender of the event. """ errors = self._notify_loop(resource, event, trigger, **kwargs) if errors and event.startswith(events.BEFORE): abort_event = event.replace( events.BEFORE, events.ABORT) self._notify_loop(resource, abort_event, trigger) raise exceptions.CallbackFailure(errors=errors) #/neutron/callbacks/manager.py:CallbacksManager def _notify_loop(self, resource, event, trigger, **kwargs): """The notification loop.""" LOG.info(_LI("Notify callbacks for %(resource)s, %(event)s"), {'resource': resource, 'event': event}) errors = [] # TODO(armax): consider using a GreenPile for callback_id, callback in self._callbacks[resource][event].items(): try: LOG.info(_LI("Calling callback %s"), callback_id) callback(resource, event, trigger, **kwargs) except Exception as e: LOG.exception(_LE("Error during notification for " "%(callback)s %(resource)s, %(event)s"), {'callback': callback_id, 'resource': resource, 'event': event}) errors.append(exceptions.NotificationError(callback_id, e)) return errors
最终在/neutron/callbacks/manager.py:CallbacksManager中的_notify_loop函数执行该callback函数。
下面,我将利用官网的例子进行介绍。
from neutron.callbacks import events from neutron.callbacks import resources from neutron.callbacks import registry
def callback1(resource, event, trigger, **kwargs): print ‘Callback1 called by trigger: ‘, trigger print ‘kwargs: ‘, kwargs
def callback2(resource, event, trigger, **kwargs): print ‘Callback2 called by trigger: ‘, trigger print ‘kwargs: ‘, kwargs
# B and C express interest with I registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE) registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE) print ‘Subscribed‘
# A notifies def do_notify(): kwargs = {‘foo‘: ‘bar‘} registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
print ‘Notifying...‘ do_notify() |
执行上面的代码,其输出为:
Subscribed Notifying... Callback1 called by trigger: <function do_notify at 0x1e86578> kwargs: {‘foo‘: ‘bar‘} Callback2 called by trigger: <function do_notify at 0x1e86578> kwargs: {‘foo‘: ‘bar‘} |
因为subscribecallback1和callback2时,都是利用相同的resource(resources.ROUTER)和相同的event(events.BEFORE_CREATE)去subscribe的。所以最终在调用_notify_loop函数时,利用for循环,将执行callback1和callback2函数。
至此,core plugin和service plugin的创建代码流程分析完成。总结一下。
core plugin的创建涉及到type_manager管理的type driver(如vlan,flat)的创建, mechanism_manager管理的mechanism driver(如linuxbridge, openvswitch)的创建以及extension_manager管理的extension driver的创建(不过我的OpenStack并没使用extension driver)。还涉及到RPC-client的创建。
service plugin的创建涉及RPC-client和RPC-server的创建 以及使用Neutron Callback System 注册了一些callback函数。
这里总结一下coreresource(core API)与extension resource(extension API)的关系。
类似于nova对API资源的管理方式,neutron也将基于各种网络抽象得到的API资源分为coreresource(core API)与extension resource(extension API),core API只对应L2层的network/subnet/port/subnetpool(Kilo版本添加进来的)四种资源,其余的各层抽象都涵盖在extension API的范围。
Extension API有两种方式扩展现有的资源,一种是为network/subnet/port/subnetpool增加属性,比如port binding扩展,还有一种就是增加一些新的资源,比如VPNaaS等。其结构如下
由于neutron-server涉及的代码流程较多,对于extension resource和RPC创建在下一篇文章进行分析。
PS:由于《nova boot代码流程分析(三):nova与neutron的交互》没有分析完成,所以待neutron-server分析完成后,再分析nova与neutron的交互的剩下内容。
标签:
原文地址:http://blog.csdn.net/gj19890923/article/details/51333899