标签:messages 持久化 单播 err stop targe bsp point 模块
AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
RabbitMQ是一个实现了AMQP协议标准的开源消息代理和队列服务器。
1、基本概念
在服务器中,三个主要功能模块连接成一个处理链完成预期的功能:
1)“exchange”接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。
2)“message queue”存储消息,直到这些消息被消费者安全处理完为止。
3)“binding”定义了exchange和message queue之间的关联,提供路由规则。
使用这个模型我们可以很容易的模拟出存储转发队列和主题订阅这些典型的消息中间件概念。
上图中各个模块的说明如下:
Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等。
Connection: publisher、consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
2、Exchange和Binding
交换机Exchange拿到一个消息之后会将它路由给队列。Exchange使用哪种方式路由是由Binding规则决定的。
1)直连交换机
根据消息携带的路由键(routing key)将消息投递给对应队列。直连交换机用来处理消息的单播路由。
Message中的“routing key”如果和Binding中的“binding key”一致, Direct exchange则将message发到对应的queue中。
2)主题交换机
通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机用来实现消息的多播路由。
3)扇形交换机
将消息路由给绑定到它身上的所有队列,且不理会路由键。扇形交换机用来处理消息的广播路由。
3、RabbitMQ
send.py
# -*- coding:utf-8 -*- import pika # 权限验证 credentials = pika.PlainCredentials(‘admin‘, ‘bigdata123‘) # 链接参数 params = pika.ConnectionParameters( host=‘10.93.21.21‘, port=5077, virtual_host=‘/‘, credentials=credentials ) # 建立链接 connection = pika.BlockingConnection(parameters=params) # 从链接中获得信道 channel = connection.channel() # 声明交换机 channel.exchange_declare( exchange=‘exchangeA‘, exchange_type=‘direct‘, passive=False, durable=True, auto_delete=False ) # delivery_mode=2表示让消息持久化, 重启RabbitMQ也不丢失. # 考虑成本, 开启此功能, 建议把消息存储到SSD上. props = pika.BasicProperties(content_type=‘text/plain‘, delivery_mode=2) channel.basic_publish( exchange=‘exchangeA‘, routing_key=‘a_routing_key‘, body=‘Hello World!‘, properties=props ) print(" [x] Sent ‘Hello World!‘") # 关闭链接 connection.close()
receive.py
# -*- coding:utf-8 -*- import pika # 权限验证 credentials = pika.PlainCredentials(‘admin‘, ‘bigdata123‘) # 链接参数 params = pika.ConnectionParameters( ‘10.93.21.21‘, 5077, ‘/‘, credentials ) # 建立链接 connection = pika.BlockingConnection(parameters=params) # 从链接中获得信道 channel = connection.channel() # 声明交换机 channel.exchange_declare( exchange=‘exchangeA‘, exchange_type=‘direct‘, passive=False, durable=True, auto_delete=False ) # 声明队列, 如果没有就创建 channel.queue_declare(queue=‘standard‘, auto_delete=True) # 通过路由键将队列和交换机绑定 channel.queue_bind( queue=‘standard‘, exchange=‘exchangeA‘, routing_key=‘a_routing_key‘ ) # 处理接收到的消息的回调函数 # method_frame携带了投递标记, header_frame表示AMQP信息头的对象 def callback(channel, method_frame, header_frame, body): channel.basic_ack(delivery_tag=method_frame.delivery_tag) print(" [x] Received %r" % body) # 订阅队列 channel.basic_consume( callback, queue=‘standard‘ ) try: print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # 关闭链接 connection.close()
标签:messages 持久化 单播 err stop targe bsp point 模块
原文地址:http://www.cnblogs.com/kangoroo/p/7632913.html