标签:res sse app process 定义 自定义 strategy queue header
引用依赖pom.xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQConfig
package com.cx.temp.common.rabbitmq.spring; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
测试类
package com.cx.temp.rabbitmq; import com.cx.temp.admin.AdminApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = AdminApplication.class) public class RabbitMQTest { @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //第一种声明与绑定方式 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //第二种 支持链式声明与绑定 rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.topic.queue", false)) .to(new TopicExchange("test.topic", false, false)) .with("user.#")); rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } }
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
代码实现:
package com.cx.temp.common.rabbitmq.spring; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } }
RabbitTemplate,即消息模板
代码实现:
package com.cx.temp.common.rabbitmq.spring; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
package com.cx.temp.common.rabbitmq.spring; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("8.131.95.173:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
运行后
将消息读出来
综合多种写法
package com.cx.temp.rabbitmq; import com.cx.temp.admin.AdminApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = AdminApplication.class) public class RabbitMQTest { @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //第一种声明与绑定方式 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //第二种 支持链式声明与绑定 rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.topic.queue", false)) .to(new TopicExchange("test.topic", false, false)) .with("user.#")); rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { //消息发送之后在对这个message进行设置 @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("----------添加额外的设置-----------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); } @Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc",message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!"); } }
查看queue001消息
代码实现:
通过启动SpringBootApplicaiton在启动rabbitmq
package com.cx.temp.admin; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan(basePackages = {"com.cx.temp"}) public class AdminApplication extends SpringBootServletInitializer{ @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(AdminApplication.class); } public static void main(String[] args) { try { SpringApplication.run(AdminApplication.class, args); } catch (Exception e) { e.printStackTrace(); } } }
RabbitMQConfig
package com.cx.temp.admin.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import java.util.UUID; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Binding binding003(){ return BindingBuilder.bind(queue003()).to(exchange002()).with("spring.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); //不进行重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("-----------消费者:" + msg); } }); return container; } }
启动后,AdminApplicaiton控制台收到
RabbitMQTest控制台
这里是由于Test启动也启动了RabbitMQConfig配置,所以在Test控制台消费了一条,到Application只剩2条可消费
标签:res sse app process 定义 自定义 strategy queue header
原文地址:https://www.cnblogs.com/huihui-hui/p/14397026.html