标签:ext 有感 href end 插件 lib base pull 必须
文本介绍的系统与以上不同,本系统旨在通过消息扇出机制(fanout mechanism)实现进程间的回调。
在Neutron中,代理可能需要订阅特定的资源细节,这些细节可能会随着时间而改变。此消息回调系统的目的是允许代理订阅这些资源,而无需扩展修改现有的RPC调用,或创建新的RPC消息。
一些可以从这个回调系统中受益的资源:
使用远程发布者/订阅者模型,可以使用扇出fanout消息将资源信息发布到所有感兴趣的节点,最小化从代理到服务器的消息请求,因为代理订阅维持在其整个生命周期(除非它们取消订阅)。
在一个代理中,可能有多个订阅者回调对应着同一个资源事件,资源更新将通过单条消息被调度到多个订阅者回调。任何更新都会以一条消息出现,为每个接收代理agent仅执行一次oslo版本化对象的反串行化。
此发布/订阅机制高度依赖于传输资源的格式。这就是为什么oslo库只允许发布和订阅的版本化对象。olso版本化对象允许对象版本向下/向上转换。参见本文附录的:vo_mkcompat。
有关VO(Versioned Objects)的版本控制模式,请查看文末链接:[VO_versioning]。
版本化对象的序列化/反序列化函数obj_to_primitive(target_version=…) 和 primitive_to_obj() 在内部用于消息传递之前/之后转换/获取对象,这两个函数的详细解释参见文末的链接 [ov_serdes]。
序列化的版本化对象如下所示:
{‘versioned_object.version‘: ‘1.0‘,
‘versioned_object.name‘: ‘QoSPolicy‘,
‘versioned_object.data‘: {‘rules‘: [
{‘versioned_object.version‘: ‘1.0‘,
‘versioned_object.name‘: ‘QoSBandwidthLimitRule‘,
‘versioned_object.data‘: {‘name‘: u‘a‘},
‘versioned_object.namespace‘: ‘versionedobjects‘}
],
‘uuid‘: u‘abcde‘,
‘name‘: u‘aaa‘},
‘versioned_object.namespace‘: ‘versionedobjects‘}
在本节中,我们假设标准Neutron升级过程,这意味着先升级服务器,然后升级代理。
我们提供了一种自动方法,避免了由于管理员手动固定版本和取消固定版本,可能会引入的错误。
资源拉取请求将始终正常工作,因为基础资源RPC仍然提供请求的资源ID/IDs的版本。服务端将首先升级,这样它将始终能够满足代理的任何版本请求。
代理将订阅neutron-vo-<resource_type>-扇出队列,它为它们所知道的版本携带更新的对象。它们所知道的版本依赖于它们开始运行时的Neutron版本化对象。
当服务端升级时,它应能够立即统计每个对象的代理版本(稍后章节我们将为此定义一个机制)。它将使用此统计信息为资源类型具有的所有版本发送扇出消息。
例如,如果neutron-server知道它有资源类型"A"的rpc-callback回调的1.0版本代理,和1.2版本代理,任何更新将发送neutron-vo-A_1.0 和 neutron-vo-A_1.2。
TODO待完成:验证升级完成后,任何未使用的消息资源(queues, exchanges,等)当旧的代理离开时被释放,并且neutron-server停止产生新的信息广播。否则记录如果我们完成了滚动升级后,需要清除队列queues,就要重新启动neutron-server。
我们在代理数据库中添加一行以跟踪代理已知对象和版本号。这类似于数据库配置列的实现。
代理在开始时不仅会立即报告其配置,而且还会报告它们订阅的对象类型/版本对,并存储在数据库中,提供给任何请求它的neutron-server:
‘resource_versions‘: {‘QosPolicy‘: ‘1.1‘,
‘SecurityGroup‘: ‘1.0‘,
‘Port‘: ‘1.0‘}
有一部分Liberty代理依赖于“QosPolicy”:如果qos插件已安装,要求’QosPolicy’: ‘1.0’版本。我们能够按二进制名称识别它们(包括在报告中):
这个转换是在Mitaka版本中处理的,但没有在Newton版本中处理,因为只支持一个主版本的升级。
在上述机制的作用下,考虑需要QoSpolicy 1.0的neutron-openvswitch-agent 和 neutron-sriov-agent,我们发现每个推送通知中都要发送的版本子集。
处于关闭状态的代理将从该计算中排除。在此计算中,我们为代理使用扩展的超时时间来确保安全,特别是如果部署人员将代理标记为低超时时间。
从Mitaka版本开始,任何通过此API对版本化对象感兴趣的代理应报告其感兴趣的资源/版本元组(它们订阅的资源类型/版本对)。
对该RPC机制感兴趣的插件必须继承AgentDbMixin,因为这个机制目前只用于代理,如果需要的话,它可以扩展到供其它组件使用。
AgentDbMixin提供:
def get_agents_resource_versions(self, tracker):
...
每个对象的版本子集被缓存,避免每次推送时发送DB请求,鉴于我们假设所有旧代理在升级时已完成注册。
在neutron.api.rpc.callbacks.version_manager.VERSIONS_TTL之后,重新计算缓存子集(以减少代理升级后的版本集)。
以下是在所有neutron-servers上快速更新此缓存的路径,即当升级后的代理出现(或者旧代理在长时间超时甚至降级后恢复)在注册新状态的服务器时,其通过广播通知其它的服务器关于此新代理的资源版本。
必须发送所有计算版本集的所有通知,否则未升级代理不会收到它们。
向任何扇出队列发送通知是安全的,因为如果没有代理在监听,它们将被丢弃。
neutron-vo-<resource_class_name>-
将来,我们可能希望oslo消息支持动态的订阅topics,那么我们可能需要使用:
neutron-vo-<resource_class_name>-<resource_id>-
或者类似的东西,可以为接收者提供精细的粒度,只为它们提供有用的信息。
假设你有一个代理A,它只需要处理一个新端口,其具有关联的安全组和QoS策略。
处理端口更新的代理代码可能如下所示:
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
def process_resource_updates(context, resource_type, resource_list, event_type):
# send to the right handler which will update any control plane
# details related to the updated resources...
def subscribe_resources():
registry.register(process_resource_updates, resources.SEC_GROUP)
registry.register(process_resource_updates, resources.QOS_POLICY)
def port_update(port):
# here we extract sg_id and qos_policy_id from port..
sec_group = registry.pull(resources.SEC_GROUP, sg_id)
qos_policy = registry.pull(resources.QOS_POLICY, qos_policy_id)
相关函数为:
回调函数将接收以下的参数:
对接收者上动态topics的底层oslo_messaging支持,我们不能按照每个"resource type + resource id"主题topic实现,rabbitmq似乎处理10000个topic而不受损,但在不同的topics上创造100个oslo_messaging似乎就崩溃了。
我们稍后可能会对此进行深入调查,以避免代理接收对它们来说是无意义的资源更新。
取消订阅注册的回调:
在服务器端,资源更新可以来自任何地方,一个服务插件,一个扩展,任何更新、创建或销毁资源,以及任何订阅代理感兴趣的事件。
期望由一个接收资源列表的回调方法。当在列表中的资源属于同一资源类型时,将发送单个push RPC消息;如果列表包含不同资源类型的对象,对每种类型的资源进行分组并单独发送,每种类型一个push RPC 消息。在接收端,列表中的资源始终属于同一类型。换句话说,服务器端推送异构对象列表将在总线上生成N条消息,并且触发N个客户端回调,其中N是资源列表中资源类型的数目,例如L(A、A、B、C、C、C)将分段为L1(A,A),L2(B),L3(C,C,C),每一个列表单独推送。
注:不保证单独资源列表将交付给消费者的顺序。
服务器/发布者 一侧如下所示:
from neutron.api.rpc.callbacks.producer import registry
from neutron.api.rpc.callbacks import events
def create_qos_policy(...):
policy = fetch_policy(...)
update_the_db(...)
registry.push([policy], events.CREATED)
def update_qos_policy(...):
policy = fetch_policy(...)
update_the_db(...)
registry.push([policy], events.UPDATED)
def delete_qos_policy(...):
policy = fetch_policy(...)
update_the_db(...)
registry.push([policy], events.DELETED)
标签:ext 有感 href end 插件 lib base pull 必须
原文地址:https://www.cnblogs.com/liuhongru/p/11956825.html