标签:ace one @param sys use turn top 当前时间 out
测试启动类
public class SpringTest { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("rabbitmq.xml"); RabbitTemplate template = context.getBean(RabbitTemplate.class); template.convertAndSend("Spring整合RabbitMQXML消息"); ((ClassPathXmlApplicationContext) context).destroy(); } }
消费者
public class MyConsumer { /** * 用于接收消息 * @param message */ public void test(String message){ System.out.println("接收到的消息为: "+message); } }
resource/rabbitmq.xml
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 定义连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.100.86" port="5672" username="admin" password="admin" virtual-host="/test" publisher-confirms="true"></rabbit:connection-factory> <!-- 定义 rabbitmq 模板 消息发送到 queue 或 exchange --> <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定义队列 --> <rabbit:queue name="myQueue2" auto-declare="true"/> <!--定义交换机, 与队列绑定--> <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="myQueue2"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 定义消费者 --> <bean id="consumer" class="com.bearpx.rabbitmq.spring.MyConsumer"/> <!--定义监听容器, 当收到消息时 执行内部的配置--> <rabbit:listener-container connection-factory="connectionFactory"> <!-- 定义由哪个类的什么方法来处理收到的消息--> <rabbit:listener ref="consumer" method="test" queue-names="myQueue2"/> </rabbit:listener-container> </beans>
<rabbit:direct-exchange name="directExchange" durable="true" auto-declare="false"> <rabbit:bindings> <rabbit:binding queue="myQueue2" key="key2"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:topic-exchange name="topicExchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding pattern="key.*" queue="myQueue2"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
测试结果
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="com.bearpx.rabbitmq"></context:component-scan> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean> <!-- 定义连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.100.86" port="5672" username="admin" password="admin" virtual-host="/test" publisher-confirms="true"></rabbit:connection-factory> <!-- 定义 rabbitmq 模板--> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="confirm_test" auto-declare="true"/> <rabbit:direct-exchange name="directExchange" id="directExchange"> <rabbit:bindings> <rabbit:binding queue="confirm_test"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="receiveConfirmTestListener" queues="confirm_test"/> </rabbit:listener-container> </beans>
ConfirmCallBackListener
@Component("confirmCallBackListener") public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("确认回调, ack: "+ack +" ; cause: " + cause); } }
ReturnCallBackListener
@Component("returnCallBackListener") public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("失败 message:" + new String(message.getBody()) +" replyCode: "+ replyCode + " replyText: "+ replyText + " exchange: "+ exchange +" routingKey: "+ routingKey); } }
ReceiveConfirmTestListener
@Component("receiveConfirmTestListener") public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { /** * 收到消息时 执行的监听*/ @Override public void onMessage(Message message, Channel channel) throws Exception{ try{ System.out.println("消费者收到了消息: "+ message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ e.printStackTrace(); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
工具类
@Component("publishUtil") public class PublishUtil { @Autowired private AmqpTemplate amqpTemplate; public void send(String exchange, String routingKey, Object message){ amqpTemplate.convertAndSend(exchange,routingKey, message); } }
测试类
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class TestMain { @Autowired private PublishUtil publishUtil; private static String EXCHANGE_NAME="directExchange"; private static String QUEUE_NAME="confirm_test"; // EXCHANGE QUEUE 都对, confirm 会执行, ack=true @Test public void test1() throws Exception{ String message = "当前时间为:" + System.currentTimeMillis(); publishUtil.send(EXCHANGE_NAME,QUEUE_NAME, message); } }
test1()测试结果:
消费者收到了消息: (Body:‘当前时间为:1609902615382‘ MessageProperties [
headers={spring_listener_return_correlation=43ec414b-86aa-4b5d-a664-db5ede6a31dd},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
redelivered=false, receivedExchange=directExchange, receivedRoutingKey=confirm_test, deliveryTag=1, consumerTag=amq.ctag-Q5xlehjut9wdFxiMEUpiGw, consumerQueue=confirm_test
]) 确认回调, ack: true ; cause: null
test2()
// EXCHANGE 错误 QUEUE 对, confirm 会执行, ack=false @Test public void test2() throws Exception{ String message = "当前时间为:" + System.currentTimeMillis(); publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME, message); }
测试结果
确认回调, ack: false ; cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)
test3()
// EXCHANGE 对 QUEUE 错误, confirm 会执行, ack=true,失败会执行 @Test public void test3() throws Exception{ String message = "当前时间为:" + System.currentTimeMillis(); publishUtil.send(EXCHANGE_NAME,QUEUE_NAME+"1", message); Thread.sleep(2000); }
测试结果
失败 message:当前时间为:1609902810084 replyCode: 312 replyText: NO_ROUTE exchange: directExchange routingKey: confirm_test1
确认回调, ack: true ; cause: null
test4()
// EXCHANGE QUEUE 都错误, confirm 会执行, ack=false @Test public void test4() throws Exception{ String message = "当前时间为:" + System.currentTimeMillis(); publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME+"1", message); }
测试结果
确认回调, ack: false ; cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)
标签:ace one @param sys use turn top 当前时间 out
原文地址:https://www.cnblogs.com/kingdomer/p/14240304.html