码迷,mamicode.com
首页 > 编程语言 > 详细

spring集成RabbitMQ配置文件详解(生产者和消费者)

时间:2018-09-08 12:24:20      阅读:1890      评论:0      收藏:0      [点我收藏+]

标签:指定   部分   文件   rop   简单   exce   private   container   标签   

1,首先引入配置文件org.springframework.amqp,如下:

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.1.RELEASE</version>
        </dependency>

2,准备工作:安装好rabbitmq,并在项目中增加配置文件   rabbit.properties 内容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

3,配置spring-rabbitmq.xml,内容如下:

<!-- 公共部分 -->
<!-- 创建连接类 连接安装好的 rabbitmq -->
<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="localhost" />    
    <property name="username" value="${rmq.manager.user}" />    
    <property name="password" value="${rmq.manager.password}" />   
    <property name="host" value="${rmq.ip}" />   
    <property name="port" value="${rmq.port}" />   
</bean>
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义消息队列,durable:是否持久化;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />

<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心 -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- 生产者部分 -->
<!-- 发送消息的producer类,也就是生产者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA">
    <!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 -->
    <property name="queueName" value="{alert.queue.1}"/>
</bean>

<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-->

<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

<!-- 消费者部分 -->
<!-- 自定义接口类 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>

<!-- 用于消息的监听的代理类MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
        <!-- 类名 -->
     <constructor-arg ref="testHandler" /> <!-- 方法名 -->
    
<property name="defaultListenerMethod" value="handlerTest"></property><property name="messageConverter" ref="jsonMessageConverter"></property> </bean> <!-- 配置监听acknowledeg="manual"设置手动应答.当消息处理失败时:会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"> <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
    
<rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
     <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

4,生产者(发送端)代码:

@Resource  
    private RabbitTemplate rabbitTemplate;
    private String queueName;  
    public void sendMessage(CommonMessage msg){
             try {  
                  logger.error("发送信息开始");
                  System.out.println(rabbitTemplate.getConnectionFactory().getHost());  
                 //发送信息  queueName交换机,就是上面的routingKey msg.getSource() 为 test_key 
                 rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);
                 logger.error("发送信息结束");
             } catch (Exception e) {  
                 e.printStackTrace();
             }
        }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

 

5,消费端代码:TestHandler 类

public class TestHandler  {
    @Override
    public void handlerTest(CommonMessage commonMessage) {
        System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
    }
}

本文转自:

https://blog.csdn.net/nandao158/article/details/81065892

https://www.cnblogs.com/LipeiNet/p/6079427.html

spring集成RabbitMQ配置文件详解(生产者和消费者)

标签:指定   部分   文件   rop   简单   exce   private   container   标签   

原文地址:https://www.cnblogs.com/nizuimeiabc1/p/9608763.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!