标签:rabbitmq spring boot (消费者处理消息)
RabbitMq消息消费者服务
开发工具Idea和Spring boot来开发的。
消息消费目前只是一个简单的Demo,后续会处理成更智能一些。
首先配置文件类,RabbitMqConfig,里面配置一些用户名和密码嗨哟队列信息。
package com.basic.rabbitmq.consumer.config;
import com.basic.rabbitmq.consumer.listener.HandleMessageListenerAdapter;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
/**
* Rabbitmq配置类
* Created by sdc on 2017/7/4.
*/
@Configuration
@ComponentScan(basePackages = {"com.basic"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMqConfig {
@Autowired
private Environment env;
/**
* 构建connectionfactory
* @return
* @throws Exception
*/
@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
connectionFactory.setPort(Integer.valueOf("5672".trim()));
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
return connectionFactory;
}
/**
* CachingConnectionFactory
* @return
* @throws Exception
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}
/**
* RabbitTemplate,类似于jdbctemplate一样的工具类
* @return
* @throws Exception
*/
@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}
@Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("handleMessageListenerAdapter") HandleMessageListenerAdapter handleMessageListenerAdapter) throws Exception {
//队列名字
String queueName = env.getProperty("emial.server.queue").trim();
//单一的消息监听容器
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(cachingConnectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(handleMessageListenerAdapter);
//手动设置 ACK,就是成功消费信息了,就设置一下这个,rabbitmq就从此队列里删除这条信息了。
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleMessageListenerContainer;
}
}我这里配置了一个SimpleMessageListenerContainer,这个Bean,用来监听队列里的消息的。
具体的
package com.basic.rabbitmq.consumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.mail.MailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 监听消息的处理适配器
* Created by sdc on 2017/7/10.
*/
@Component("handleMessageListenerAdapter")
public class HandleMessageListenerAdapter extends MessageListenerAdapter {
// @Resource
// private JavaMailSender mailSender;
/**
* 这块和activemq那个监听器差不多,都是监听信息,也都是onMessage方法。
* @param message
* @param channel
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String messageDetail = new String(message.getBody()); //消息体
System.out.println("消息消费:" + messageDetail);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}还有一些配制文件,请看
http://10103778.blog.51cto.com/10093778/1945756
这个博客,就可以看到具体的配制了。
启动这个项目,就可以从队列消费消息了。消费者还是比较简单的,对应到相应的队列就可以处理了消息了。
本文出自 “10093778” 博客,请务必保留此出处http://10103778.blog.51cto.com/10093778/1945974
标签:rabbitmq spring boot (消费者处理消息)
原文地址:http://10103778.blog.51cto.com/10093778/1945974