码迷,mamicode.com
首页 > 编程语言 > 详细

SpringBoot 集成RabbitMQ

时间:2018-09-18 11:05:36      阅读:603      评论:0      收藏:0      [点我收藏+]

标签:amqp   ceo   mod   消息   component   wrapper   ble   service   config   

一、SpringBoot集成RabbitMQ非常简单,直接引入配置好的starter依赖,然后配置相关的信息就可以直接使用了。
1、引入依赖
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
x
1
<dependency>
2
        <groupId>org.springframework.boot</groupId>
3
        <artifactId>spring-boot-starter-amqp</artifactId>
4
</dependency>
2、配置文件配置
spring: 
    rabbitmq:
        host: 10.3.98.152
        port: 5672
        username: rabbitmq
        password: rabbitmq
        publisher-confirms: true
        publisher-returns: true
        virtual-host: /
        listener:
          simple:
            concurrency: 1
            max-concurrency: 1
          retry:
              enabled: true
        prefetchCount: 10
        concurrentConsumers: 10
x
2
21
 
1
spring: 
2
    rabbitmq:
3
        host: 10.3.98.152
4
        port: 5672
5
        username: rabbitmq
6
        password: rabbitmq
7
        publisher-confirms: true
8
        publisher-returns: true
9
        virtual-host: /
10
        listener:
11
          simple:
12
            concurrency: 1
13
            max-concurrency: 1
14
          retry:
15
              enabled: true
16
        prefetchCount: 10
17
        concurrentConsumers: 10
3、配置队列来接收消息
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
 
}
1
10
9
10
1
2
@Configuration
3
public class RabbitConfig {
4
 
5
    @Bean
6
    public Queue Queue() {
7
        return new Queue("hello");
8
    }
9
 
10
}
4、定义消息发送者
public class HelloSender {
 
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
 
}
1
13
 
1
public class HelloSender {
2
 
3
    @Autowired
4
    private AmqpTemplate rabbitTemplate;
5
 
6
    public void send() {
7
        String context = "hello " + new Date();
8
        System.out.println("Sender : " + context);
9
        this.rabbitTemplate.convertAndSend("hello", context);
10
    }
11
 
12
}
13
5、定义消息消费者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
 
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
 
}
1
11
 
1
@Component
2
@RabbitListener(queues = "hello")
3
public class HelloReceiver {
4
 
5
    @RabbitHandler
6
    public void process(String hello) {
7
        System.out.println("Receiver  : " + hello);
8
    }
9
 
10
}
11
二、动态绑定队列,并且实现消息的发送
在SpringCloud的环境下,每个客户端的队列都是不同的,此时需要动态生成队列,并且向rabbitMQ服务器注册和绑定,后台同时也需要监听该队列的接收
1、动态生成队列并且向rabbitMQ服务器注册和绑定
/**
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
    connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(host + ":" + port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setConnectionTimeout(10000);
    /* 如果要进行消息回调,则这里必须要设置为true */
    connectionFactory.setPublisherConfirms(publisherConfirms);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}
/**
     * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
     *
     * @return 对列名的数组
     * @throws AmqpException
     * @throws IOException
     */
@Bean
public String mqMsgQueues() throws AmqpException, IOException {
    //获取我方的SCEOID
    Result enterpriseResult = idService.getEnterprise();
    String intcEnte = "";
    if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
        Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
        intcEnte = (String) map.get("ente_idcode");
    }
    logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
    intcEnte = "zqb163";
    logger.info("注册的intcEnte:" + intcEnte);
    String queueName = String.format("%s.queue%d", intcEnte, 1);
    connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
    connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
    return queueName;
}
x
1
/**
2
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
3
*/
4
@Bean
5
public ConnectionFactory connectionFactory() {
6
    connectionFactory = new CachingConnectionFactory();
7
    connectionFactory.setAddresses(host + ":" + port);
8
    connectionFactory.setUsername(username);
9
    connectionFactory.setPassword(password);
10
    connectionFactory.setVirtualHost(virtualHost);
11
    connectionFactory.setConnectionTimeout(10000);
12
    /* 如果要进行消息回调,则这里必须要设置为true */
13
    connectionFactory.setPublisherConfirms(publisherConfirms);
14
    connectionFactory.setPublisherReturns(true);
15
    return connectionFactory;
16
}
17
/**
18
     * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
19
     *
20
     * @return 对列名的数组
21
     * @throws AmqpException
22
     * @throws IOException
23
     */
24
@Bean
25
public String mqMsgQueues() throws AmqpException, IOException {
26
    //获取我方的SCEOID
27
    Result enterpriseResult = idService.getEnterprise();
28
    String intcEnte = "";
29
    if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
30
        Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
31
        intcEnte = (String) map.get("ente_idcode");
32
    }
33
    logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
34
    intcEnte = "zqb163";
35
    logger.info("注册的intcEnte:" + intcEnte);
36
    String queueName = String.format("%s.queue%d", intcEnte, 1);
37
    connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
38
    connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
39
    return queueName;
40
}
2、监听队列
/**
     * 监听队列
     *
     * @return
     * @throws AmqpException
     * @throws IOException
     */
@Bean
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(mqMsgQueues());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    //设置确认模式手工确认
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    //监听处理类
    container.setMessageListener(handleService());
    return container;
}
x
 
1
/**
2
     * 监听队列
3
     *
4
     * @return
5
     * @throws AmqpException
6
     * @throws IOException
7
     */
8
@Bean
9
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
10
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
11
    container.setQueueNames(mqMsgQueues());
12
    container.setExposeListenerChannel(true);
13
    container.setMaxConcurrentConsumers(1);
14
    container.setConcurrentConsumers(1);
15
    //设置确认模式手工确认
16
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
17
    //监听处理类
18
    container.setMessageListener(handleService());
19
    return container;
20
}
4、监听处理
@Service
public class HandleService implements ChannelAwareMessageListener {
      @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // do something
    }
}
1
10
1
@Service
2
public class HandleService implements ChannelAwareMessageListener {
3
      @Override
4
    public void onMessage(Message message, Channel channel) throws Exception {
5
        byte[] body = message.getBody();
6
        //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
7
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
8
        // do something
9
    }
10
}

SpringBoot 集成RabbitMQ

标签:amqp   ceo   mod   消息   component   wrapper   ble   service   config   

原文地址:https://www.cnblogs.com/amoszhu/p/9667181.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!