标签:业务 磁盘 过程 处理 back 配置 比较 str 修改
消息队列是典型的生产者-消费者模型,生产者只管发送消息,消费者只管监听队列取出消息,没有业务逻辑的侵入,实现了生产者和消费者的解耦,这篇博客主要从如下几个方面整理相关RabbitMQ的知识点
目前比较主流的两种MQ分别是JMS和AMQP
相同点: 两个担任的角色差不多,都是依据接口实现服务间的调用
不同点: 前者跨平台,跨语言,支持五种消息模型,后者只适用于java,仅规定了两种消息模型
常见的MQ产品
假设这样一个应用场景:
新用户来注册了,我们要求他填写手机的验证码,前端异步把手机号发送到后台,调用短息微服务发送短信,这时,用户可以接着填写剩下的信息,等验证码来了,一并提交给后台
假设这样一个应用场景:
商品微服务,调用A方法,修改了商品的信息,静态页面微服务得重新生成静态页,搜索微服务得重新创建新的文档,但是问题来了,有关静态页的生成的所有逻辑方法都在静态页微服务,搜索微服务雷同,我们总不至于在 A()里面去再写一遍关于生成静态页和创键新文档的逻辑吧,这是用RabbitMQ的作用来了
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.
通过这个模型,可以知道,生产者想发送消息要做哪些准备工作 消费者想接收消息需要哪些准备工作,以及如何接收
坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
java代码,RabbitMQ-HelloWorld
生产者
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
消费者
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列, 队列可以重复声明,但是做好和生产者声明的参数保持一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {//这是个匿名内部类
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用(回调函数)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
}
};
// 监听队列,第二个参数:是否自动进行消息确认。 - 一般为了安全,我们不选择把第二个参数写成true 我们选择手动ack
channel.basicConsume(QUEUE_NAME, true, consumer); //将上面的队列和消费者进行绑定,第二个字段设置为true,自动进行消息确认,一旦有消息就回调函数接收
// 消费者类启动后,整个的过程不会停止,就像js的绑定事件,可以看到右上角和左下角的红点一直亮着
}
如果上面的消费者出现异常的话,程序也就停止了,那我们的业务逻辑就没办法执行,因此我们禁用autoACK,选择手动ack
/*@param queue the name of the queue
* @param autoAck true if the server should consider messages
* @param callback an interface to the consumer object
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
在回调函数中添加:
...业务逻辑
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
这样,在RabbitMQ中的消息状态是这样的 Ready-->Uacked-- 出现异常 --> Ready
工作队列,竞争消费模式,可以看到,通同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积,比如对于短信微服务集群来说就可以使用这种消息模型,来了请求,大家抢着消费掉,别等着
给队列添加一条属性,不再是队列把任务平均分配开给消费者,而是让消费者,消费完了后,问队列要新的任务,这样能者多劳
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
不同的订阅模型是根据交换机(Exchange)的类型划分的
订阅模型有三种
其中每个消费者拥有属于自己的队列,生产者直接把消息发送给交换机,由交换机决定到底把消息发送给谁
我们看一下,如何做到,一条消息被多个消费者消费
生产者
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 消息内容
String message = "Hello everyone";
// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
对于生产者来说,他不在去声明队列了, 获取完Channel之后,直接去创建交换机,然后发送消息
消费者:
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
对于消费者,现在没有和它的队列直接绑定的生产者了,它要多做一件事,就是把自己的队列绑定到交换机上Exchange,当它们做完这件事之后,他们都会收到相应的消息
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key ,和广播模型区分开来
生产者
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
消费者,可以绑定多个RoutingKey收到不同的消息
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
这个模式支持使用通配符
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
消费者
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
但是如果MQ挂了呢?
SpringAMQP帮我们实现了--生产者确认机制,对于不可路由的消息交换机会告诉生产者,使其重新发送
坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
生产者
rabbitmq:
host: 192.168.43.150
username: changwu..
password: 2424zcw......
virtual-host: /leyou
template: # 配置模板,给amqp template使用
retry: # 发送失败,重试的信息
enabled: true # 开启失败重试
initial-interval: 10000ms # 第一次重试的时间长
max-interval: 30000ms # 最长的重试时间间隔
multiplier: 2 # 下次重试的时间的倍数,这里的2,说明,下次重试的时间,是上次的两倍
exchange: ly.item.exchange # 交换机,这配置的是默认的交换机,rabbitmq发送往交换机发送消息,一会,不写交换机的话,默认创建这个交换机
publisher-confirms: true # 开启生产者确认,消息发送失败会重试
生产者使用AmqpTemplate模板发送消息
try{ // 新增商品后, 发送消息, 路由键 消息体
amqpTemplate.convertAndSend("item.update",1);
}catch (Exception e
){
System.out.println("修改结束,发送消息失败=="+e);
}
消费端
他不需要AmqpTemplate模板发送消息,因此不配置
rabbitmq:
host: 192.168.43.150
username: changwu
password: 2424zcw..
virtual-host: /XXX
virtual-host,和当前用户绑定的虚拟主机名, 这就Oralce里面,不同限权的用户可以看到的界面,拥有的能力是不用的,在RabbitMQ中,用户只能看到和它相关的虚拟主机下面的信息
@Component
public class Listener {
/**
* 注意这里面的异常我门自己不处理,交给springmvc,这样,一旦有异常,触发ack 消息回滚
* 监听 新增和修改(ES里,他俩是一个方法)
*
* 广播模型,消费者直接相关的是队列,
* 1. 它要把自己绑定到队列上,
* 2. 声明队列的名字,是否持久化
* 3. 声明交换机的名字,类型
* 4. 声明自己监听的路有键
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "search.item.insert.queue",durable = "ture"),
exchange = @Exchange(name = "ly.item.exchange",type = ExchangeTypes.TOPIC),
key = {"item.insert","item.update"}
))
public void listenInsertOrUpdate(Long spuId){
if(spuId==null){
return;
}
// dosomething...
}
标签:业务 磁盘 过程 处理 back 配置 比较 str 修改
原文地址:https://www.cnblogs.com/ZhuChangwu/p/11150519.html