标签:部分 引入 订阅模式 class ping string 协议 autowired connect
消息队列(Message Queue,简称MQ)本质是个队列,遵循先入先出,主要用途:不同进程Process/线程Thread之间通信 。
可以用来在两个进程间进行异步的数据交换。进程A把消息放入消息队列中,然后继续执行后续任务;进程B从消息队列中获取消息,根据消息执行任务。
高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计 ,和语言无关。
Java消息服务(Java Message Service)应用程序接口 ,是一个Java平台中关于面向消息中间件的API
以上两个是关于JMS和AMQP协议的不同点的介绍。
组成部分:
消息队列服务进程,包括两个部分,Exchange和Queue
消息队列交换机,按照一定的规则把消息转发到某个队列。
存储消息的队列,
消息生产者
消息消费者
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象 ,Connection是RabbitMQ的socket链接 ,ConnectionFactory用来创建Connection,一个连接中可以有多个通道channel,应用程序通过通道和rabbitmq连接。
rabbitmq的操作都是基于通道的。
队列是RabbitMQ的内部对象,用于存储消息
多个消费者可以连接同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理 ,当然也可以设置成不平均分配。
创建maven工程,引入相关依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
package com.lyy;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
//创建与rabbitmq的tcp连接
Connection connection=factory.newConnection();
//创建与exchange连接的通道,每个连接可以创建多个通道,每个通道代表不同的任务
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("lyy-test-01",false,false,false,null);
for (int i = 0; i < 10; i++) {
String msg="hello world:"+i;
//发布消息
channel.basicPublish("","lyy-test-01",null,msg.getBytes("utf-8"));
System.out.println(msg+"-->发送成功");
}
//关闭
channel.close();
connection.close();
}
}
package com.lyy;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
//创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("lyy-test-01",false,false,false,null);
//定义消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange=envelope.getExchange();
String routeKey=envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
String meesage=new String(body,"utf-8");
System.out.println("receive message:"+meesage);
}
};
//监听队列
channel.basicConsume("lyy-test-01",true,consumer);
}
}
入门程序算一种最简单的模式
两个消费端共同消费同一个队列中的消息 ,对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
特点:
一条消息只会被发送给一个消费者,rabbitmq采用轮询的方式将消息平均发送给多个消费者。
代码实现和入门程序类似,同时启动两个消费者。
发布订阅模式,每个消费者都监听自己的队列,生产者将消息发送给Broker,交换机将消息转发给绑定的每个队列,队列绑定的消费者都会收到消息。
与工作队列模式相比,多了一个交换机,生产者和交换机连接。实际上工作队列模式连接的是一个默认的交换机。
在发布订阅的基础上还可以和工作队列模式组合,即给这两个队列连接多个消费者,这样两个队列依然都会收到消息,但是队列只会把一条消息发送给一个消费者,即多个消费者监听同一个队列不会重复消费消息。
生产者
/**
* 发布订阅模式的生产者
*/
public class Producer02 {
public static void main(String[] args) throws Exception {
String QUEUE_1="test_lyy_publish_1";
String QUEUE_2="test_lyy_publish_2";
String EXCHANGE_FANOUT="exchange_fanout_test";
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
//2.获取连接
Connection connection = factory.newConnection();
//3.创建通道
Channel channel = connection.createChannel();
//4.创建交换机
AMQP.Exchange.DeclareOk exchange = channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
//5.创建队列,绑定交换机
channel.queueDeclare(QUEUE_1, false, false, false, null);
channel.queueDeclare(QUEUE_2,false,false,false,null);
//交换机和队列绑定
channel.queueBind(QUEUE_1,EXCHANGE_FANOUT,"");//最后一个参数表示routingKey,没有时设空字符串
channel.queueBind(QUEUE_2,EXCHANGE_FANOUT,"");
//发送消息
channel.basicPublish(EXCHANGE_FANOUT,"",null,"publish send successs".getBytes("utf-8"));
//关闭连接
channel.close();
connection.close();
}
}
消费者1,绑定队列1,
/**
* 发布订阅模式消费者1
*/
public class Consumer02_1 {
public static void main(String[] args) throws IOException, TimeoutException {
String QUEUE_1="test_lyy_publish_1";
String EXCHANGE_FANOUT="exchange_fanout_test";
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
//获取连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,如果已经存在不会重复创建
AMQP.Exchange.DeclareOk exchange = channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueBind(QUEUE_1,EXCHANGE_FANOUT,"");
//绑定队列和交换机
channel.queueBind(QUEUE_1,EXCHANGE_FANOUT,"");
//定义消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message receive:"+new String(body,"utf-8"));
}
};
//监听队列
channel.basicConsume(QUEUE_1,true,consumer);
}
}
消费者2,绑定队列2,
/**
* 发布订阅模式消费者2
*/
public class Consumer02_2 {
public static void main(String[] args) throws IOException, TimeoutException {
String QUEUE_2="test_lyy_publish_2";
String EXCHANGE_FANOUT="exchange_fanout_test";
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
//获取连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,如果已经存在不会重复创建
AMQP.Exchange.DeclareOk exchange = channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueBind(QUEUE_2,EXCHANGE_FANOUT,"");
//绑定队列和交换机
channel.queueBind(QUEUE_2,EXCHANGE_FANOUT,"");
//定义消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message receive:"+new String(body,"utf-8"));
}
};
//监听队列
channel.basicConsume(QUEUE_2,true,consumer);
}
}
每个消费者监听自己的队列,并设置routingKey;生产者将消息发送给交换机,由交换机根据路由key把消息发送给指定的队列。
注意使用的交换机类型和发布订阅模式不一样,类型为direct.
生产者: 绑定队列和交换机时需要指定routingKey,生产者发送消息时也需要指定routingKing,
消费者:绑定队列和交换机时需要指定routingKey,
注意:
在生产者或者消费者方都可以声明队列并绑定routing,只要进行了一次绑定,当前队列中就会产生一条绑定记录,即存在这个routingkey。上图是一个队列的绑定记录。
当生产者发送消息时,如果指定的routingKey和某个队列中的绑定记录中的routingKey能对上,这个消息就会被发送到此队列,然后和该队列相连的消费者不管设置的routingkey是多少,都将收到这条消息。上图中此队列有三个绑定记录,此时生产者不管以这三个的那个来发送消息,该队列都会收到消息,然后和它相连的消费者就会收到消息,与消费者中绑定队列和交换机时指定的routingKey无关。
生产者
/**
* 路由模式生产者
*/
public class Producer03 {
public static void main(String[] args) throws IOException, TimeoutException {
String QUEUE_1="queue_lyy_routing_1";
String QUEUE_2="queue_lyy_routing_2";
String EXCHANGE_DIRECT="exchange_direct_test";
String ROUTING_KEY="routing_test";
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
//2.获取连接
Connection connection = factory.newConnection();
//3.创建通道
Channel channel = connection.createChannel();
//4.声明交换机
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
//5.声明队列
channel.queueDeclare(QUEUE_1,false,false,false,null);
channel.queueDeclare(QUEUE_2,false,false,false,null);
//6.绑定队列和交换机,绑定时指定路由key
channel.queueBind(QUEUE_1,EXCHANGE_DIRECT,ROUTING_KEY);
channel.queueBind(QUEUE_2,EXCHANGE_DIRECT,"routing_test_001");
//发送消息
channel.basicPublish(EXCHANGE_DIRECT,ROUTING_KEY,null,"success routing key".getBytes("utf-8"));
channel.close();
connection.close();
}
}
生产者中声明了队列1,2,并以指定的路由key发送消息
消费者
/**
* 路由模式消费者2
*/
@SuppressWarnings("all")
public class Consumer03_2 {
public static void main(String[] args) throws IOException, TimeoutException {
String QUEUE_2="queue_lyy_routing_2";
String EXCHANGE_DIRECT="exchange_direct_test";
String ROUTING_KEY="routing_test_3";
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
//2.获取连接
Connection connection = factory.newConnection();
//3.创建通道
Channel channel = connection.createChannel();
//4.声明交换机
channel.queueDeclare(QUEUE_2,false,false,false,null);
//5.声明交换机
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
//绑定交换机和队列,指定路由key
channel.queueBind(QUEUE_2,EXCHANGE_DIRECT,ROUTING_KEY);
//定义消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
System.out.println("消费者2,"+routingKey);
System.out.println("消费者2:message receive:"+new String(body,"utf-8"));
}
};
//监听队列
channel.basicConsume(QUEUE_2,true,consumer);
}
}
消费者监听队列2,只要队列2收到了消息,该消费者就会收到消息,与其绑定队列和交换机时指定的routingkey无关。
可以绑定队列和交换机,也可以解除绑定channel.queueUnbind(参数)
与路由模式类似,但routingKey是带通配符的,某个队列绑定的路由key符合规则exchange就会发送消息到此队列
要注意的是,交换机类型是topic,绑定交换机和队列时指定的routingKey必须是带统配符的,发送消息时使用的routingKey是不带通配符的。
生产者:
/**
* 生产者4,通配符模式
*/
public class Producer04 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setPort(5672);
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("info_lyy", BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare("queue_info_sms",false,false,false,null);//发送短信消息的队列
channel.queueDeclare("queue_info_email",false,false,false,null);//邮件消息队列
//绑定交换机
//统配符以.分割,#可以匹配0个或多个单词,*可以匹配一个词
channel.queueBind("queue_info_sms","info_lyy","info.#.sms");
channel.queueBind("queue_info_email","info_lyy","info.email.#");
String msg="注册成功";
//发送短信消息
//channel.basicPublish("info_lyy","info.sms",null,msg.getBytes("utf-8"));
//发送邮件消息
channel.basicPublish("info_lyy","info.email",null,msg.getBytes("utf-8"));
//发送邮件和短信
//channel.basicPublish("info_lyy","info.email.sms",null,msg.getBytes("utf-8"));
System.out.println("发送成功");
channel.close();
connection.close();
}
}
生产者中绑定队列和交换机时使用带通配符的routingKey,发送消息时使用不带通配符的routingKey
消费者监听指定的队列,如消费者1监听queue_info_sms这个队列,这个队列和交换机绑定时使用的routingKey带通配符,如果生产者发送消息时使用的routingKey能和通配符匹配上,该队列就能收到消息,和它相连的消费者就能收到消息。
注意队列和交换机的声明以及绑定在消费者或生产者的任一方都可以,也可以两边都进行。声明时如果队列已存在不会重复创建。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: rabbit-mq-springboot
rabbitmq:
host: 127.0.1
username: guest
password: guest
virtual-host: /
package com.lyy;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.lyy")
public class RabbitMqSpringBootStudyApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqSpringBootStudyApplication.class);
}
/**
* 最简单的配置,配置一个队列,会和默认交换机相连
* @return
*/
@Bean
public Queue queue(){
return new Queue("rabbit-mq-springboot-01");
}
/**
* 配置接收短信消息的队列
* @return
*/
@Bean
public Queue smsQueue(){
return new Queue("queue_info_sms_springboot",false);
}
/**
* 配置接收邮件消息的队列
* @return
*/
@Bean
public Queue emailQueue(){
return new Queue("queue_info_email_springboot",false);
}
/**
* 处理注册消息的交换机
* @return
*/
@Bean
public Exchange registerMessageExchange(){
return ExchangeBuilder.topicExchange("registerMessageExchange").durable(false).build();
}
/**
* 绑定接收邮件消息的队列到交换机
* @return
*/
@Bean
public Binding bindEmailQueue(@Qualifier("registerMessageExchange") Exchange exchange,@Qualifier("emailQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("info.email.#").noargs();
}
}
使用springboot提供的操作rabbitmq的对象RabbitTemplate来发送消息
/**
* 发送消息的Controller
*/
@RestController
@RequestMapping("/message")
public class MessageProuductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendEmail")
public String sendEmail(){
String message="send email";
/**
* param1:routingKey,
* param2:消息内容
* 这个两个参数的方法是把消息发送到默认的交换机,这个routingKey实际指定的是队列的名称
*/
rabbitTemplate.convertAndSend("rabbit-mq-springboot-01",message);
return "success";
}
@GetMapping("/sendEmailV2")
public String sendEmailV2(){
String message="send email v2";
/**
* exchange:交换机名称,这里指定的默认交换机的名称
* routingKey:当连接到默认交换机时指定的是队列的名称
*/
rabbitTemplate.convertAndSend("","rabbit-mq-springboot-01",message);
return "success";
}
@GetMapping("/sendEmailV3")
public String sendEmailV3() {
String message="注册成功";
//发送消息时指定的routingKey不带通配符
rabbitTemplate.convertAndSend("registerMessageExchange","info.email.sms",message);
return "success";
}
}
在一个bean中使用@RabbitListener注解指定一个方法为处理消息的方法
@Component
public class MessageReceiver {
@RabbitListener(queues = "rabbit-mq-springboot-01")
public void process(String msg) {
System.out.println("Receiver : " + msg);
}
@RabbitListener(queues = "queue_info_email_springboot")
public void emailReceiver(String msg, Message message){
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println("routingKey is:"+receivedRoutingKey);
System.out.println("email Receiver : " + msg);
}
}
其中Message对象是spring和amqp整合的包中提供的,封装了消息的一些属性,例如routingKey
标签:部分 引入 订阅模式 class ping string 协议 autowired connect
原文地址:https://www.cnblogs.com/chengxuxiaoyuan/p/12985287.html