标签:group lse imp mqc exception .net 消息队列 tty bbb
1.添加 maven 项目依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.添加 spring-rabbitmq.xml 配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 --> <rabbit:connection-factory id="connectionFactory" host="host" username="user" password="pwd" port="port" virtual-host="/" channel-cache-size="5"/> <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/> <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 --> <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" /> <!-- queue litener 观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" /> <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />--> </rabbit:listener-container> </beans>
3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml
<import resource="classpath:conf/spring-rabbitmq.xml"/>
4.Gson配置
package top.tarencez.ssmdemo.config; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.gson.Gson; public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter { private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(), messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }
5.生产者接口及接口调用
package top.tarencez.ssmdemo.common.component; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MQProducer { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(String queueKey, Object message) { System.out.println("===== " + amqpTemplate); try { amqpTemplate.convertAndSend(queueKey, message); System.out.println("===== 消息发送成功 ====="); } catch (Exception e) { System.out.println(e); } } }
package top.tarencez.ssmdemo.rabbitmq.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import top.tarencez.ssmdemo.common.component.MQProducer; import top.tarencez.ssmdemo.shiro.vo.TestVO; @Controller @RequestMapping("/mq") public class MQController { @Autowired private MQProducer mqProducer; @RequestMapping("/test") public void test() { System.out.println("===== test mq send ====="); TestVO testVO = new TestVO(); testVO.setId(1); testVO.setName1("aaa"); testVO.setName2("bbb"); mqProducer.sendMessage("rabbit.queue.test1.key", testVO); } }
6.消费者接口
package top.tarencez.ssmdemo.common.component; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; @Component public class MQListenter implements MessageListener { @Override public void onMessage(Message msg) { try { System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
7.测试验证
1.添加 maven 项目依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.添加 spring-rabbitmq.xml 配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 --> <rabbit:connection-factory id="connectionFactory" host="host" username="user" password="pwd" port="port" virtual-host="/" channel-cache-size="5"/> <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/> <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 --> <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" /> <!-- queue litener 观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" /> <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />--> </rabbit:listener-container> </beans>
3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml
<import resource="classpath:conf/spring-rabbitmq.xml"/>
4.Gson配置
package top.tarencez.ssmdemo.config;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.google.gson.Gson;
public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
private static ClassMapper classMapper = new DefaultClassMapper();
private static Gson gson = new Gson();
public Gson2JsonMessageConverter() {
super();
}
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
byte[] bytes = null;
try {
String jsonString = gson.toJson(object);
bytes = jsonString.getBytes(getDefaultCharset());
} catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(object.getClass(), messageProperties);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetClass);
} catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
} else {
log.warn("Could not convert incoming message with content-type ["
+ contentType + "]");
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws
UnsupportedEncodingException {
String contentAsString = new String(body, encoding);
return gson.fromJson(contentAsString, clazz);
}
}
5.生产者接口及接口调用
package top.tarencez.ssmdemo.common.component;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(String queueKey, Object message) {
System.out.println("===== " + amqpTemplate);
try {
amqpTemplate.convertAndSend(queueKey, message);
System.out.println("===== 消息发送成功 =====");
} catch (Exception e) {
System.out.println(e);
}
}
}
package top.tarencez.ssmdemo.rabbitmq.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import top.tarencez.ssmdemo.common.component.MQProducer;
import top.tarencez.ssmdemo.shiro.vo.TestVO;
@Controller
@RequestMapping("/mq")
public class MQController {
@Autowired
private MQProducer mqProducer;
@RequestMapping("/test")
public void test() {
System.out.println("===== test mq send =====");
TestVO testVO = new TestVO();
testVO.setId(1);
testVO.setName1("aaa");
testVO.setName2("bbb");
mqProducer.sendMessage("rabbit.queue.test1.key", testVO);
}
}
6.消费者接口
package top.tarencez.ssmdemo.common.component;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
public class MQListenter implements MessageListener {
@Override
public void onMessage(Message msg) {
try {
System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
7.测试验证
参考文章:
https://www.cnblogs.com/tohxyblog/p/7256554.html
https://blog.csdn.net/qq_37936542/article/details/80111555
https://www.cnblogs.com/s648667069/p/6401463.html
标签:group lse imp mqc exception .net 消息队列 tty bbb
原文地址:https://www.cnblogs.com/tarencez/p/10886938.html