标签:package rdl framework 接收 led contents server sed throws
工作中用到的RabbitMQ例子 , 但是最后没有用 , 用的CMQ , 顺便说下CMQ社区真的少 , 并且功能少 .
一、消息体
package com.bootdo.common.rabbitmq.batchsendsms; import com.alibaba.fastjson.JSON; import com.bootdo.common.utils.UUIDGenerator; import java.io.Serializable; import java.util.Date; import java.util.Map; /** * MQ消息体 * @author yuduojia * @date 2018/8/1 10:42 */ public class BacthSendSMSMessage implements Serializable{ private static final long serialVersionUID = 1L; private Integer intRun;//当前次数 private Integer total;//总次数 private String productCode; // 生产者代码 private String consumerCode; // 消费者代码 private String messageId; // 消息唯一标识 private Date created; // 消息发送时间 private Map<String, Object> bussinessBody; // 消息体,封装业务数据 private BacthSendSMSMessage() { super(); } public BacthSendSMSMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) { this.intRun = intRun; this.total = total; this.productCode = productCode; this.consumerCode = consumerCode; this.bussinessBody = bussinessBody; } public static String productMQMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) { BacthSendSMSMessage mqObj = new BacthSendSMSMessage(intRun, total, productCode, consumerCode, bussinessBody); mqObj.setCreated(new Date()); mqObj.setMessageId(generatSeriaeNo()); return JSON.toJSONString(mqObj); } //生成消息唯一标识 private static String generatSeriaeNo() { return UUIDGenerator.generate(); } public String getProductCode() { return productCode; } public void setProductCode(String productCode) { this.productCode = productCode; } public String getConsumerCode() { return consumerCode; } public void setConsumerCode(String consumerCode) { this.consumerCode = consumerCode; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public Date getCreated() { return created; } public void setCreated(Date created) { this.created = created; } public Map<String, Object> getBussinessBody() { return bussinessBody; } public void setBussinessBody(Map<String, Object> bussinessBody) { this.bussinessBody = bussinessBody; } public static long getSerialVersionUID() { return serialVersionUID; } public Integer getIntRun() { return intRun; } public void setIntRun(Integer intRun) { this.intRun = intRun; } public Integer getTotal() { return total; } public void setTotal(Integer total) { this.total = total; } @Override public String toString() { return "BacthSendSMSMessage{" + "intRun=" + intRun + ", total=" + total + ", productCode=" + productCode + ", consumerCode=" + consumerCode + ", messageId=‘" + messageId + ‘\‘‘ + ", created=" + created + ", bussinessBody=" + bussinessBody + ‘}‘; } }
二、发布者
package com.bootdo.common.rabbitmq.batchsendsms; import com.bootdo.server.vo.SendModel; import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.UUID; @Component public class BatchSendSMSSender { private static final Logger logger = LoggerFactory.getLogger(BatchSendSMSSender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send(List<SendModel> list, String channel) { int count = 0; for (int i = 0; i < list.size(); i++) { Map<String, Object> bussiness = Maps.newHashMap(); bussiness.put("productId", 15); bussiness.put("companyId", 100); //B公司id bussiness.put("isSmallPerson", 1); //1 or 0 bussiness.put("assignType", 1); bussiness.put("bookNum", 1); bussiness.put("bookAmount", 100); bussiness.put("channel",channel); bussiness.put("templeteCode",list.get(i).getTempleteCode()); bussiness.put("templeteParam",list.get(i).getTempleteParam()); bussiness.put("phone",list.get(i).getPhone()); String msgId = UUID.randomUUID().toString(); String messageBody = BacthSendSMSMessage.productMQMessage(i+1,list.size(),"pro","pro",bussiness); Message message = MessageBuilder.withBody(messageBody.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setCorrelationIdString(msgId).build(); /*将 msgId和 CorrelationData绑定*/ CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend("exchange", "ttd.trust.product", message); System.out.println("ProductTopicSender : " + messageBody); } } }
三、订阅者(监听)
package com.bootdo.common.rabbitmq.batchsendsms; import com.alibaba.fastjson.JSON; import com.bootdo.common.config.ConfigConstants; import com.bootdo.server.service.SMSRetryProxy; import com.bootdo.server.service.SMSService; import com.bootdo.server.service.SMSToSend; import com.bootdo.sms.domain.ModelDO; import com.bootdo.sms.domain.SmsSendLogDO; import com.bootdo.sms.service.ModelService; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; import java.util.Map; /** * MQ消息监听 * @author yuduojia * @date 2018/7/30 14:53 */ @Component @RabbitListener(queues = "ttd.trust.product") public class BctchSendSMSReceiver { private final Logger logger = LoggerFactory.getLogger(BctchSendSMSReceiver.class); @Autowired private ModelService modelService; @Autowired private ConnectionFactory connectionff; @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionff); container.setQueueNames("ttd.trust.product"); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { try { logger.info("消费端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody())); logger.info("topic:"+message.getMessageProperties().getReceivedRoutingKey()); byte[] body = message.getBody(); String jsonString = new String(body); logger.info("BctchSendSMSReceiver : " + jsonString); BacthSendSMSMessage msg = JSON.parseObject(jsonString, BacthSendSMSMessage.class); Map<String, Object> bussinessBody = msg.getBussinessBody(); List<String> templateParams = (List<String>)bussinessBody.get("templeteParam"); String phone = (String) bussinessBody.get("phone"); ModelDO modelDO = modelService.getbyCode((String)bussinessBody.get("templeteCode")); String text = modelDO.getSendModel(); String[] split = text.split("#"); String model = ""; for (int i = 0; i < split.length; i++) { if(i%2==0){ model = model + split[i]; } else if(i%2!=0){ model = model + templateParams.get((i-1)/2); } } SMSToSend toDo = new SMSRetryProxy().getInstance(new SMSService()); Map<String,Object> map = new HashMap<String,Object>(); map.put("channel",(String) bussinessBody.get("channel")); map.put("sendTemplateCode",(String)bussinessBody.get("templeteCode")); map.put("templeteContents",templateParams); map.put("mobile",phone); map.put("apikey", ConfigConstants.SEND_SMS_APIKEY2); // map.put("text","【娄维伟】尊敬的张帅东,您有一个工单号为:222991011111111111的待处理工单,请您及时处理!"); map.put("text",model); map.put("smsProvider","云片"); Map<String,Object> return1 = toDo.singleSend(map); logger.info("SendSMSController——sendSMS——短信发送详细信息为:"+return1.toString()); SmsSendLogDO smsSendLogDO = (SmsSendLogDO)return1.get("smsSendLogDO"); boolean preRet = preEventHandler(msg); if (preRet == false) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条 return ; } boolean postRet = postEventHandler(msg); if (postRet == false) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条 return ; } //记录日志 afterEventHandler(msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条 //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); if (message.getMessageProperties().getRedelivered()) { logger.error("消息已重复处理失败,拒绝再次接收__"+message.getMessageProperties().getDeliveryTag()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息 } else { logger.error("消息即将再次返回队列处理__"+message.getMessageProperties().getDeliveryTag()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列 } } } }); return container; } private void recordLogMQ(BacthSendSMSMessage message, Integer state) { /*LogMqMessage log = new LogMqMessage(); log.setMessageId(message.getMessageId()); log.setProductCode(message.getProductCode()); log.setConsumerCode(message.getConsumerCode()); log.setEvent(message.getEvent()); log.setBussinessBody(JSON.toJSONString(message)); log.setState(state); logMqMessageService.insertEntry(log);*/ System.out.println("记录日志"); } /** * 消息体检查 * @param message * @return Map */ private boolean preEventHandler(BacthSendSMSMessage message) { return false; } /** * 业务处理 * @param message * @return */ private boolean postEventHandler(BacthSendSMSMessage message) { return true; } /** * 记录消息日志 * @param message */ private void afterEventHandler(BacthSendSMSMessage message) { recordLogMQ(message, 1); } }
四、RabbitMQConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import com.bootdo.common.rabbitmq.confirm.MsgSendConfirmCallBack; import com.bootdo.common.rabbitmq.confirm.MsgSendReturnCallback; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author yuduojia * @date 2018/8/3 15:14 */ @Configuration public class RabbitMqConfig { public static final String ROUTING_KEY_1 = "batch_send_key1"; public static final String ROUTING_KEY_2 = "batch_send_key2"; @Autowired private QueueConfig queueConfig; @Autowired private ExchangeConfig exchangeConfig; @Autowired private ConnectionFactory connectionFactory; /** * 将消息队列1和交换机1进行绑定,指定队列key1 */ @Bean public Binding binding_one() { return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1); } /** * 将消息队列2和交换机1进行绑定,指定队列key2 */ @Bean public Binding binding_two() { return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2); } /** * 定义rabbit template用于数据的接收和发送 * 可以设置消息确认机制和回调 * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); // template.setMessageConverter(); 可以自定义消息转换器 默认使用的JDK的,所以消息对象需要实现Serializable // template.setMessageConverter(new Jackson2JsonMessageConverter()); template.setConfirmCallback(msgSendConfirmCallBack()); template.setReturnCallback(msgSendReturnCallback()); template.setMandatory(true); return template; } @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack(){ return new MsgSendConfirmCallBack(); } @Bean public MsgSendReturnCallback msgSendReturnCallback(){ return new MsgSendReturnCallback(); } }
五、QueueConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 队列配置 可以配置多个队列 * @author zhuzhe * @date 2018/5/25 13:25 * @email 1529949535@qq.com */ @Configuration public class QueueConfig { /*对列名称*/ public static final String BATCH_SEND_QUEUE_NAME1 = "batch_send_queue1"; public static final String BATCH_SEND_QUEUE_NAME2 = "batch_send_queue2"; //public static final String BATCH_SEND_QUEUE_NAME3 = "batch_send_queue3"; @Bean public Queue firstQueue() { return new Queue(BATCH_SEND_QUEUE_NAME1,true,false,false); } @Bean public Queue secondQueue() { return new Queue(BATCH_SEND_QUEUE_NAME2,true,false,false); } /*@Bean public Queue thirdQueue() { return new Queue(BATCH_SEND_QUEUE_NAME3,true,false,false); }*/ }
六、ExchangeConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息交换机配置 可以配置多个 * @author zhuzhe * @date 2018/5/25 15:40 * @email 1529949535@qq.com */ @Configuration public class ExchangeConfig { public final String EXCHANGE_01 = "batch_send_exchange"; @Bean public DirectExchange directExchange(){ DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false); return directExchange; } }
七、确认后回调
package com.bootdo.common.rabbitmq.confirm; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; /** * 确认后回调 massage2exchange * @author yuduojia * @date 2018/8/2 15:49 */ public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息确认成功cause:"+cause); } else { //处理丢失的消息 System.out.println("消息确认失败:"+correlationData.getId()+"#cause:"+cause); } } }
八、失败后回调
package com.bootdo.common.rabbitmq.confirm; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 失败后return回调 exchange2queue * @author yuduojia * @date 2018/8/2 15:48 */ public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("确认后回调return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); } }
如果实际用需要优化 , 我这个没有优化就被砍掉啦 , 很遗憾 等下次用到再更新,确认回调机制 用redis做就可以。
如果有疑问或者建议欢迎评论。
标签:package rdl framework 接收 led contents server sed throws
原文地址:https://www.cnblogs.com/TimeSay/p/10869653.html