标签:
概念:
生产者(Producer,简写P),负责发布消息。
“交换机”(Exchange, 简写X), 负责中转消息。
路由(Route, 简写R), 即 X->Q的路线名。
消息队列 (Queue, 简写Q), 负责临时存储消息。
消费者(Customer,简写C), 负责处理消息。
完整关系图解:
P: 负责发布消息, 可绑定到一个exchange上,默认的exchange名为空字符串,类型为direct。 推送消息时,需要指定路由名(routing_key)。
发布消息时,需要指定:
routing_key, 路由名
body,消息正文
properties,消息属性
AMQP协议定义的消息属性支持14种之多,最常用的有四种:
delivery_mode: 2---持久化消息。 其他值----临时消息(不存文件/数据库)
content_type: 内容类型(json类型设为: application/json)
reply_to: 回调队列名称,
correlation_id: 消息id, 可用于匹配响应内容。
X: 负责路由消息,类型有
topic类型的路由, 其名为"关键词1.关键次2.关键词n",每个关键词均可用通配符取代 (* / #)
* : 代表一个单词(关键字)
# : 代表0~n个单词(用.分隔)
路由情况:
eg.
A.orange.B 匹配*.orange.* , 去Q1
A.B.rabbit, 匹配*.*.rabbit, 去Q2
lazy.B.orange, 匹配 *.orange.* 和 laze.# 去 Q1和Q2, 消息被处理两次!
lazy.B.rabbit, 匹配 *.*.rabbit 和 lazy.# 去Q2, 由于是统一个队列,只投递一次。只处理一次!
hardworking.A.cow, 不匹配任何路由规则,消息将被丢弃!
路由的声明: 在绑定队列到Exchange上时声明。
路由:队列绑定到exchange上,需要指定路由,默认为‘‘(路由名为空时,匹配队列名)
Q: 队列, 缓存消息,
可进行持久化(durable), 在少数极端条件下,即使durable的队列消息也可能丢失。
要持久化消息,处理队列要声明为可持久化的,消息在推送时,也需要指定其属性为可持久化的(delivery_mode = 2)。
有临时队列(C指定,用exclusive属性声明队列), 随消费者销毁即销毁(专用)。
队列在分发消息给消费者(多个消费者)时,默认是按消息条数平分的。即:若队列(Q)里有10条消息, 有两个消费者(C1和C2), 那么C1得到(1,3,5,7,9), C2得到(2,4,6,8,10)。
若想队列在下发消息时,考虑到消费者的处理能力,做到处理大而耗时任务的少发,处理小而快速的任务的消费者多发(即负载均衡), 需要配置消费者同时处理最大任务数属性(prefetch)
pika 里面是: channel.basic_qos(prefetch=1)
C: 消费者可以绑定到一个队列上,进行监听,消息来了即处理, 从这个角度讲, 消费者可以作为server端。
消费者在轮询监听到消息,交给回调函数(callback)来处理消息。
消费者指定消息队列中的消息的删除模式(确认后删除/不需要确认删除),通过no_ack参数(默认为False, 即需要确认), 需要确认的消息在其回调函数中,得进行确认操作。以保证消息总能得到处理,不会丢失。默认, 如果一个消费者进程挂掉了, 没有确认消息处理完了, 消息队列将重发此消息给下一个消费者(保证服务可靠性)。
应用:
1. 提高系统并发性:
对于某些可以异步处理的任务(发送短信/邮件/推送等等),及时将任务publish到消息队列中,及时返回,不阻塞请求。
2. 应用于某些设计模式:
消息发布/订阅
3. rpc
demo参考这里。
转载请注明来源:http://www.cnblogs.com/Tommy-Yu/p/5802264.html
谢谢!
标签:
原文地址:http://www.cnblogs.com/Tommy-Yu/p/5802264.html