码迷,mamicode.com
首页 > 其他好文 > 详细

Redis进阶例子

时间:2019-05-15 16:43:46      阅读:97      评论:0      收藏:0      [点我收藏+]

标签:package   rdl   framework   接收   led   contents   server   sed   throws   

工作中用到的RabbitMQ例子 , 但是最后没有用 , 用的CMQ , 顺便说下CMQ社区真的少 , 并且功能少 .

一、消息体

技术图片
package com.bootdo.common.rabbitmq.batchsendsms;

import com.alibaba.fastjson.JSON;
import com.bootdo.common.utils.UUIDGenerator;

import java.io.Serializable;
import java.util.Date;
import java.util.Map;

/**
 * MQ消息体
 * @author yuduojia
 * @date 2018/8/1 10:42
 */
public class BacthSendSMSMessage implements Serializable{
    private static final long serialVersionUID = 1L;

    private Integer intRun;//当前次数
    private Integer total;//总次数
    private String productCode; // 生产者代码
    private String consumerCode; // 消费者代码
    private String messageId; // 消息唯一标识
    private Date created; // 消息发送时间
    private Map<String, Object> bussinessBody; // 消息体,封装业务数据

    private BacthSendSMSMessage() {
        super();
    }

    public BacthSendSMSMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) {
        this.intRun = intRun;
        this.total = total;
        this.productCode = productCode;
        this.consumerCode = consumerCode;
        this.bussinessBody = bussinessBody;
    }

    public static String productMQMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) {
        BacthSendSMSMessage mqObj = new BacthSendSMSMessage(intRun, total, productCode, consumerCode, bussinessBody);
        mqObj.setCreated(new Date());
        mqObj.setMessageId(generatSeriaeNo());
        return JSON.toJSONString(mqObj);
    }

    //生成消息唯一标识
    private static String generatSeriaeNo() {
        return UUIDGenerator.generate();
    }

    public String getProductCode() {
        return productCode;
    }

    public void setProductCode(String productCode) {
        this.productCode = productCode;
    }

    public String getConsumerCode() {
        return consumerCode;
    }

    public void setConsumerCode(String consumerCode) {
        this.consumerCode = consumerCode;
    }

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public Date getCreated() {
        return created;
    }

    public void setCreated(Date created) {
        this.created = created;
    }

    public Map<String, Object> getBussinessBody() {
        return bussinessBody;
    }

    public void setBussinessBody(Map<String, Object> bussinessBody) {
        this.bussinessBody = bussinessBody;
    }

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public Integer getIntRun() {
        return intRun;
    }

    public void setIntRun(Integer intRun) {
        this.intRun = intRun;
    }

    public Integer getTotal() {
        return total;
    }

    public void setTotal(Integer total) {
        this.total = total;
    }

    @Override
    public String toString() {
        return "BacthSendSMSMessage{" +
                "intRun=" + intRun +
                ", total=" + total +
                ", productCode=" + productCode +
                ", consumerCode=" + consumerCode +
                ", messageId=‘" + messageId + ‘\‘‘ +
                ", created=" + created +
                ", bussinessBody=" + bussinessBody +
                ‘}‘;
    }
}
View Code

二、发布者

技术图片
package com.bootdo.common.rabbitmq.batchsendsms;

import com.bootdo.server.vo.SendModel;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.UUID;

@Component
public class BatchSendSMSSender {
    private static final Logger logger = LoggerFactory.getLogger(BatchSendSMSSender.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(List<SendModel> list, String channel) {
        int count = 0;
        for (int i = 0; i < list.size(); i++) {
            Map<String, Object> bussiness = Maps.newHashMap();
            bussiness.put("productId", 15);
            bussiness.put("companyId", 100); //B公司id
            bussiness.put("isSmallPerson", 1); //1 or 0
            bussiness.put("assignType", 1);
            bussiness.put("bookNum", 1);
            bussiness.put("bookAmount", 100);
            bussiness.put("channel",channel);
            bussiness.put("templeteCode",list.get(i).getTempleteCode());
            bussiness.put("templeteParam",list.get(i).getTempleteParam());
            bussiness.put("phone",list.get(i).getPhone());


        String msgId = UUID.randomUUID().toString();

        String messageBody = BacthSendSMSMessage.productMQMessage(i+1,list.size(),"pro","pro",bussiness);
        Message message = MessageBuilder.withBody(messageBody.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setCorrelationIdString(msgId).build();
        /*将 msgId和 CorrelationData绑定*/
        CorrelationData correlationData = new CorrelationData(msgId);

        rabbitTemplate.convertAndSend("exchange", "ttd.trust.product", message);
        System.out.println("ProductTopicSender : " + messageBody);
        }
    }

}
View Code

三、订阅者(监听)

技术图片
package com.bootdo.common.rabbitmq.batchsendsms;

import com.alibaba.fastjson.JSON;

import com.bootdo.common.config.ConfigConstants;
import com.bootdo.server.service.SMSRetryProxy;
import com.bootdo.server.service.SMSService;
import com.bootdo.server.service.SMSToSend;
import com.bootdo.sms.domain.ModelDO;
import com.bootdo.sms.domain.SmsSendLogDO;
import com.bootdo.sms.service.ModelService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * MQ消息监听
 * @author yuduojia
 * @date 2018/7/30 14:53
 */
@Component
@RabbitListener(queues = "ttd.trust.product")
public class BctchSendSMSReceiver {
    private final Logger logger = LoggerFactory.getLogger(BctchSendSMSReceiver.class);
    @Autowired
    private ModelService modelService;
    @Autowired
    private ConnectionFactory connectionff;

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionff);
        container.setQueueNames("ttd.trust.product");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {

               try {
                   logger.info("消费端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody()));
                   logger.info("topic:"+message.getMessageProperties().getReceivedRoutingKey());
                   byte[] body = message.getBody();
                   String jsonString =  new String(body);
                   logger.info("BctchSendSMSReceiver  : " + jsonString);
                   BacthSendSMSMessage msg = JSON.parseObject(jsonString, BacthSendSMSMessage.class);
                   Map<String, Object> bussinessBody = msg.getBussinessBody();
                   List<String> templateParams = (List<String>)bussinessBody.get("templeteParam");
                   String phone = (String) bussinessBody.get("phone");

                   ModelDO modelDO = modelService.getbyCode((String)bussinessBody.get("templeteCode"));
                   String text = modelDO.getSendModel();
                   String[] split = text.split("#");
                   String model = "";
                   for (int i = 0; i < split.length; i++) {
                       if(i%2==0){
                           model = model + split[i];
                       }
                       else if(i%2!=0){
                           model = model + templateParams.get((i-1)/2);
                       }
                   }
                   SMSToSend toDo = new SMSRetryProxy().getInstance(new SMSService());
                   Map<String,Object> map = new HashMap<String,Object>();
                   map.put("channel",(String) bussinessBody.get("channel"));
                   map.put("sendTemplateCode",(String)bussinessBody.get("templeteCode"));
                   map.put("templeteContents",templateParams);
                   map.put("mobile",phone);
                   map.put("apikey", ConfigConstants.SEND_SMS_APIKEY2);
                   // map.put("text","【娄维伟】尊敬的张帅东,您有一个工单号为:222991011111111111的待处理工单,请您及时处理!");
                   map.put("text",model);
                   map.put("smsProvider","云片");
                   Map<String,Object> return1 = toDo.singleSend(map);
                   logger.info("SendSMSController——sendSMS——短信发送详细信息为:"+return1.toString());
                   SmsSendLogDO smsSendLogDO = (SmsSendLogDO)return1.get("smsSendLogDO");

                   boolean preRet = preEventHandler(msg);
                   if (preRet == false) {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
                        return ;
                   }

                   boolean postRet = postEventHandler(msg);
                   if (postRet == false) {
                       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
                       return ;
                   }
                   //记录日志
                   afterEventHandler(msg);
                   channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
                   //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                    if (message.getMessageProperties().getRedelivered()) {
                        logger.error("消息已重复处理失败,拒绝再次接收__"+message.getMessageProperties().getDeliveryTag());
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
                    } else {
                        logger.error("消息即将再次返回队列处理__"+message.getMessageProperties().getDeliveryTag());
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
                    }
                }
            }
        });
        return container;
    }

    private void recordLogMQ(BacthSendSMSMessage message, Integer state) {
        /*LogMqMessage log = new LogMqMessage();
        log.setMessageId(message.getMessageId());
        log.setProductCode(message.getProductCode());
        log.setConsumerCode(message.getConsumerCode());
        log.setEvent(message.getEvent());
        log.setBussinessBody(JSON.toJSONString(message));
        log.setState(state);

        logMqMessageService.insertEntry(log);*/
        System.out.println("记录日志");
    }

    /**
     * 消息体检查
     * @param message
     * @return Map
     */
    private boolean preEventHandler(BacthSendSMSMessage message) {


        return false;
    }

    /**
     * 业务处理
     * @param message
     * @return
     */
    private boolean postEventHandler(BacthSendSMSMessage message) {


        return true;
    }

    /**
     * 记录消息日志
     * @param message
     */
    private void afterEventHandler(BacthSendSMSMessage message) {
        recordLogMQ(message, 1);
    }

}
View Code

四、RabbitMQConfig

技术图片
package com.bootdo.common.rabbitmq.batchsendsms.config;

import com.bootdo.common.rabbitmq.confirm.MsgSendConfirmCallBack;
import com.bootdo.common.rabbitmq.confirm.MsgSendReturnCallback;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author yuduojia
 * @date 2018/8/3 15:14
 */
@Configuration
public class RabbitMqConfig {

    public static final String ROUTING_KEY_1 = "batch_send_key1";
    public static final String ROUTING_KEY_2 = "batch_send_key2";

    @Autowired
    private QueueConfig queueConfig;

    @Autowired
    private ExchangeConfig exchangeConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 将消息队列1和交换机1进行绑定,指定队列key1
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1);
    }

    /**
     * 将消息队列2和交换机1进行绑定,指定队列key2
     */
    @Bean
    public Binding binding_two() {
        return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2);
    }

    /**
     * 定义rabbit template用于数据的接收和发送
     * 可以设置消息确认机制和回调
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // template.setMessageConverter(); 可以自定义消息转换器  默认使用的JDK的,所以消息对象需要实现Serializable
        // template.setMessageConverter(new Jackson2JsonMessageConverter());

        template.setConfirmCallback(msgSendConfirmCallBack());
        template.setReturnCallback(msgSendReturnCallback());
        template.setMandatory(true);
        return template;
    }

    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack(){
        return new MsgSendConfirmCallBack();
    }

    @Bean
    public MsgSendReturnCallback msgSendReturnCallback(){
        return new MsgSendReturnCallback();
    }

}
View Code

五、QueueConfig

技术图片
package com.bootdo.common.rabbitmq.batchsendsms.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 队列配置  可以配置多个队列
 * @author zhuzhe
 * @date 2018/5/25 13:25
 * @email 1529949535@qq.com
 */
@Configuration
public class QueueConfig {

    /*对列名称*/
    public static final String BATCH_SEND_QUEUE_NAME1 = "batch_send_queue1";
    public static final String BATCH_SEND_QUEUE_NAME2 = "batch_send_queue2";
    //public static final String BATCH_SEND_QUEUE_NAME3 = "batch_send_queue3";

    @Bean
    public Queue firstQueue() {
        return new Queue(BATCH_SEND_QUEUE_NAME1,true,false,false);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(BATCH_SEND_QUEUE_NAME2,true,false,false);
    }

    /*@Bean
    public Queue thirdQueue() {
        return new Queue(BATCH_SEND_QUEUE_NAME3,true,false,false);
    }*/
}
View Code

六、ExchangeConfig

技术图片
package com.bootdo.common.rabbitmq.batchsendsms.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息交换机配置  可以配置多个
 * @author zhuzhe
 * @date 2018/5/25 15:40
 * @email 1529949535@qq.com
 */
@Configuration
public class ExchangeConfig {

    public final String EXCHANGE_01 = "batch_send_exchange";

    @Bean
    public DirectExchange directExchange(){
        DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false);
        return directExchange;
    }
}
View Code

七、确认后回调

技术图片
package com.bootdo.common.rabbitmq.confirm;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/**
 * 确认后回调  massage2exchange
 * @author yuduojia
 * @date 2018/8/2 15:49
 */
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息确认成功cause:"+cause);
        } else {
            //处理丢失的消息
            System.out.println("消息确认失败:"+correlationData.getId()+"#cause:"+cause);
        }
    }
}
View Code

八、失败后回调

技术图片
package com.bootdo.common.rabbitmq.confirm;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * 失败后return回调  exchange2queue
 * @author yuduojia
 * @date 2018/8/2 15:48
 */
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        System.out.println("确认后回调return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:"
                + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }
}
View Code

 如果实际用需要优化 , 我这个没有优化就被砍掉啦 , 很遗憾 等下次用到再更新,确认回调机制 用redis做就可以。

  如果有疑问或者建议欢迎评论。

Redis进阶例子

标签:package   rdl   framework   接收   led   contents   server   sed   throws   

原文地址:https://www.cnblogs.com/TimeSay/p/10869653.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!