标签:amqp ceo mod 消息 component wrapper ble service config
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId></dependency>spring: 
    rabbitmq:
        host: 10.3.98.152
        port: 5672
        username: rabbitmq
        password: rabbitmq
        publisher-confirms: true
        publisher-returns: true
        virtual-host: /
        listener:
          simple:
            concurrency: 1
            max-concurrency: 1
          retry:
              enabled: true
        prefetchCount: 10
        concurrentConsumers: 10x
spring    rabbitmq        host10.3.98.152        port5672        usernamerabbitmq        passwordrabbitmq        publisher-confirmstrue        publisher-returnstrue        virtual-host/        listener          simple            concurrency1            max-concurrency1          retry              enabledtrue        prefetchCount10        concurrentConsumers10@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
 
}public class RabbitConfig {         public Queue Queue() {        return new Queue("hello");    } }public class HelloSender {
 
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
 
}
public class HelloSender {         private AmqpTemplate rabbitTemplate;     public void send() {        String context = "hello " + new Date();        System.out.println("Sender : " + context);        this.rabbitTemplate.convertAndSend("hello", context);    } }@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
 
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
 
}
(queues = "hello")public class HelloReceiver {         public void process(String hello) {        System.out.println("Receiver  : " + hello);    } }/**
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
    connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(host + ":" + port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setConnectionTimeout(10000);
    /* 如果要进行消息回调,则这里必须要设置为true */
    connectionFactory.setPublisherConfirms(publisherConfirms);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}
/**
     * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
     *
     * @return 对列名的数组
     * @throws AmqpException
     * @throws IOException
     */
@Bean
public String mqMsgQueues() throws AmqpException, IOException {
    //获取我方的SCEOID
    Result enterpriseResult = idService.getEnterprise();
    String intcEnte = "";
    if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
        Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
        intcEnte = (String) map.get("ente_idcode");
    }
    logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
    intcEnte = "zqb163";
    logger.info("注册的intcEnte:" + intcEnte);
    String queueName = String.format("%s.queue%d", intcEnte, 1);
    connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
    connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
    return queueName;
}x
/***由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory*/public ConnectionFactory connectionFactory() {    connectionFactory = new CachingConnectionFactory();    connectionFactory.setAddresses(host + ":" + port);    connectionFactory.setUsername(username);    connectionFactory.setPassword(password);    connectionFactory.setVirtualHost(virtualHost);    connectionFactory.setConnectionTimeout(10000);    /* 如果要进行消息回调,则这里必须要设置为true */    connectionFactory.setPublisherConfirms(publisherConfirms);    connectionFactory.setPublisherReturns(true);    return connectionFactory;}/**     * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)     *     * @return 对列名的数组     * @throws AmqpException     * @throws IOException     */public String mqMsgQueues() throws AmqpException, IOException {    //获取我方的SCEOID    Result enterpriseResult = idService.getEnterprise();    String intcEnte = "";    if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {        Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();        intcEnte = (String) map.get("ente_idcode");    }    logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);    intcEnte = "zqb163";    logger.info("注册的intcEnte:" + intcEnte);    String queueName = String.format("%s.queue%d", intcEnte, 1);    connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);    connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);    return queueName;}/**
     * 监听队列
     *
     * @return
     * @throws AmqpException
     * @throws IOException
     */
@Bean
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(mqMsgQueues());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    //设置确认模式手工确认
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    //监听处理类
    container.setMessageListener(handleService());
    return container;
}x
/**     * 监听队列     *     * @return     * @throws AmqpException     * @throws IOException     */public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);    container.setQueueNames(mqMsgQueues());    container.setExposeListenerChannel(true);    container.setMaxConcurrentConsumers(1);    container.setConcurrentConsumers(1);    //设置确认模式手工确认    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);    //监听处理类    container.setMessageListener(handleService());    return container;}@Service
public class HandleService implements ChannelAwareMessageListener {
      @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // do something
    }
}public class HandleService implements ChannelAwareMessageListener {          public void onMessage(Message message, Channel channel) throws Exception {        byte[] body = message.getBody();        //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        // do something    }}标签:amqp ceo mod 消息 component wrapper ble service config
原文地址:https://www.cnblogs.com/amoszhu/p/9667181.html