码迷,mamicode.com
首页 > 编程语言 > 详细

RabbitMQ - 02Spring整合XML形式

时间:2021-01-12 11:12:45      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:ace   one   @param   sys   use   turn   top   当前时间   out   

RabbitMQ - 02Spring整合XML形式

(1)XML形式:方式一

测试启动类

public class SpringTest {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("rabbitmq.xml");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("Spring整合RabbitMQXML消息");
        ((ClassPathXmlApplicationContext) context).destroy();
    }
}

消费者

public class MyConsumer {

    /**
     * 用于接收消息
     * @param message
     */
    public void test(String message){
        System.out.println("接收到的消息为: "+message);
    }
}

resource/rabbitmq.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 定义连接工厂-->
    <rabbit:connection-factory id="connectionFactory"
                                 host="192.168.100.86" port="5672" username="admin" password="admin"
                                 virtual-host="/test" publisher-confirms="true"></rabbit:connection-factory>
    <!-- 定义 rabbitmq 模板  消息发送到 queue 或 exchange -->
    <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- 定义队列 -->
    <rabbit:queue name="myQueue2" auto-declare="true"/>
    <!--定义交换机, 与队列绑定-->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue2"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 定义消费者 -->
    <bean id="consumer" class="com.bearpx.rabbitmq.spring.MyConsumer"/>
    <!--定义监听容器, 当收到消息时 执行内部的配置-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- 定义由哪个类的什么方法来处理收到的消息-->
        <rabbit:listener ref="consumer" method="test" queue-names="myQueue2"/>
    </rabbit:listener-container>
</beans>
    <rabbit:direct-exchange name="directExchange" durable="true" auto-declare="false">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue2" key="key2"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <rabbit:topic-exchange name="topicExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding pattern="key.*" queue="myQueue2"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

测试结果

 技术图片

 

(2)XML形式:方式二

(2.1)resource/applicationContext.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="com.bearpx.rabbitmq"></context:component-scan>
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
    <!-- 定义连接工厂-->
    <rabbit:connection-factory id="connectionFactory"
                                 host="192.168.100.86" port="5672" username="admin" password="admin"
                                 virtual-host="/test" publisher-confirms="true"></rabbit:connection-factory>
    <!-- 定义 rabbitmq 模板-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"
                     mandatory="true"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue name="confirm_test" auto-declare="true"/>
    <rabbit:direct-exchange name="directExchange" id="directExchange">
        <rabbit:bindings>
            <rabbit:binding queue="confirm_test"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener ref="receiveConfirmTestListener" queues="confirm_test"/>
    </rabbit:listener-container>
</beans>

ConfirmCallBackListener

@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("确认回调, ack: "+ack +" ; cause: " + cause);
    }
}

ReturnCallBackListener 

@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("失败 message:" + new String(message.getBody()) +" replyCode: "+ replyCode +  " replyText: "+ replyText +
                " exchange: "+ exchange +" routingKey: "+ routingKey);
    }
}

ReceiveConfirmTestListener 

@Component("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
    /**
     * 收到消息时 执行的监听*/
    @Override
    public void onMessage(Message message, Channel channel) throws Exception{
        try{
            System.out.println("消费者收到了消息: "+ message);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            e.printStackTrace();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

工具类

@Component("publishUtil")
public class PublishUtil {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String exchange, String routingKey, Object message){
        amqpTemplate.convertAndSend(exchange,routingKey, message);
    }
}

测试类

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestMain {

    @Autowired
    private PublishUtil publishUtil;

    private static String EXCHANGE_NAME="directExchange";
    private static String QUEUE_NAME="confirm_test";

    // EXCHANGE QUEUE 都对, confirm 会执行, ack=true
    @Test
    public void test1() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME,QUEUE_NAME, message);
    }
}

test1()测试结果:

消费者收到了消息: (Body:‘当前时间为:1609902615382‘ MessageProperties [ 
headers={spring_listener_return_correlation=43ec414b-86aa-4b5d-a664-db5ede6a31dd},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
redelivered=false, receivedExchange=directExchange, receivedRoutingKey=confirm_test, deliveryTag=1, consumerTag=amq.ctag-Q5xlehjut9wdFxiMEUpiGw, consumerQueue=confirm_test
]) 确认回调, ack: true ; cause: null

 

test2()

    // EXCHANGE 错误  QUEUE 对, confirm 会执行, ack=false
    @Test
    public void test2() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME, message);
    }

测试结果

确认回调, ack: false ; cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)

 

test3()

    // EXCHANGE 对  QUEUE 错误, confirm 会执行, ack=true,失败会执行
    @Test
    public void test3() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME,QUEUE_NAME+"1", message);
        Thread.sleep(2000);
    }

测试结果

失败 message:当前时间为:1609902810084 replyCode: 312 replyText: NO_ROUTE exchange: directExchange routingKey: confirm_test1
确认回调, ack: true ; cause: null

 

test4()

    // EXCHANGE   QUEUE 都错误, confirm 会执行, ack=false
    @Test
    public void test4() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME+"1", message);
    }

测试结果 

确认回调, ack: false ; cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)

 

RabbitMQ - 02Spring整合XML形式

标签:ace   one   @param   sys   use   turn   top   当前时间   out   

原文地址:https://www.cnblogs.com/kingdomer/p/14240304.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!