标签:context core 短信 src catch direct info actor auth
1.pom文件引入maven依赖:
<!-- RabbitMq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.application.properties文件添加自定义配置,也可使用默认的key:
#使用rabbitmq发送短信 spring.rabbitmq.sms.addresses=10.0.193.28:5672,10.0.193.29:5672 spring.rabbitmq.sms.username=admin spring.rabbitmq.sms.password=123456 spring.rabbitmq.sms.virtual-host=msg spring.rabbitmq.sms.publisher-confirms=true spring.rabbitmq.sms.publisher-returns=true #rabbitmq服务连接超时时间/ms spring.rabbitmq.sms.connect.timeout=10000 sms.direct.exchange=exc.msgtosend sms.direct.queue=que.msg.fast.batch sms.direct.routingkey=msg_fast
3.添加配置类RabbitMqConfig,配置RabbitTemplate模板
1 package cn.drz.config; 2 3 import javax.annotation.Resource; 4 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.core.Binding; 8 import org.springframework.amqp.core.BindingBuilder; 9 import org.springframework.amqp.core.DirectExchange; 10 import org.springframework.amqp.core.Queue; 11 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 12 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 13 import org.springframework.amqp.rabbit.core.RabbitTemplate; 14 import org.springframework.beans.factory.annotation.Qualifier; 15 import org.springframework.beans.factory.annotation.Value; 16 import org.springframework.context.annotation.Bean; 17 import org.springframework.context.annotation.Configuration; 18 19 /** 20 * 使用RabbitMq Direct交换器模式推送消息 21 * @author drz 22 * 23 */ 24 @Configuration 25 public class RabbitMqConfig { 26 27 private final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class); 28 29 @Resource 30 private BasicConfig basicConfig; 31 32 @Bean 33 public Queue smsDirectQueue() { 34 return new Queue(basicConfig.getSmsDirectQueue()); 35 } 36 37 @Bean 38 public DirectExchange smsDirectExchange() { 39 return new DirectExchange(basicConfig.getSmsDirectExchange()); 40 } 41 42 @Bean 43 public Binding bindQueueToExchangeWithKey() { 44 return BindingBuilder.bind(smsDirectQueue()).to(smsDirectExchange()) 45 .with(basicConfig.getSmsRoutingKey()); 46 } 47 48 @Bean(name = "smsConnectionFactory") 49 public ConnectionFactory smsConnectionFactory( 50 @Value("${spring.rabbitmq.sms.addresses}") String addresses, 51 @Value("${spring.rabbitmq.sms.username}") String username, 52 @Value("${spring.rabbitmq.sms.password}") String password, 53 @Value("${spring.rabbitmq.sms.virtual-host}") String virtualHost, 54 @Value("${spring.rabbitmq.sms.publisher-confirms}") boolean publisherConfirms, 55 @Value("${spring.rabbitmq.sms.publisher-returns}") boolean publisherReturns) { 56 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 57 connectionFactory.setAddresses(addresses); 58 connectionFactory.setUsername(username); 59 connectionFactory.setPassword(password); 60 connectionFactory.setVirtualHost(virtualHost); 61 connectionFactory.setPublisherConfirms(publisherConfirms); 62 connectionFactory.setPublisherReturns(publisherReturns); 63 connectionFactory.setConnectionTimeout(basicConfig.getSmsConnectTimeout()); 64 return connectionFactory; 65 } 66 67 @Bean(name = "smsRabbitTemplate") 68 public RabbitTemplate smsRabbitTemplate( 69 @Qualifier("smsConnectionFactory") ConnectionFactory connectionFactory) { 70 RabbitTemplate smsRabbitTemplate = new RabbitTemplate(connectionFactory); 71 smsRabbitTemplate.setMandatory(true); 72 logger.info("加载smsRabbitTemplate完成"); 73 return smsRabbitTemplate; 74 } 75 }
BasicConfig配置基类如下,也可将application.properties文件中的配置全部映射到BasicConfig;
1 package cn.drz.config; 2 3 import org.springframework.beans.factory.annotation.Value; 4 import org.springframework.stereotype.Component; 5 6 import lombok.Getter; 7 import lombok.Setter; 8 9 @Getter 10 @Setter 11 @Component 12 public class BasicConfig { 13 14 @Value("${sms.direct.queue}") 15 private String smsDirectQueue; 16 17 @Value("${sms.direct.exchange}") 18 private String smsDirectExchange; 19 20 @Value("${sms.direct.routingkey}") 21 private String smsRoutingKey; 22 23 @Value("${spring.rabbitmq.sms.connect.timeout}") 24 private int smsConnectTimeout; 25 26 }
4.以上配置完毕,即可使用RabbitTemplate发消息,并能通过实现ConfirmCallback, ReturnCallback来接收发送消息结果的回调:
如下是发短信的demo:
接口:
1 package cn.drz.service.open; 2 3 import cn.drz.model.Message; 4 5 public interface SmsService { 6 /** 7 * 发送短信 8 */ 9 void sendMsg(Message message) throws Exception; 10 11 }
model 类 Message:
1 package cn.drz.model; 2 3 import java.io.Serializable; 4 5 import lombok.Getter; 6 import lombok.Setter; 7 @Getter 8 @Setter 9 public class Message implements Serializable { 10 11 12 String mobile; 13 14 String content; 15 16 }
实现类:
1 package cn.drz.service.open.impl; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.HashMap; 6 import java.util.List; 7 import java.util.Map; 8 import java.util.UUID; 9 10 import javax.annotation.Resource; 11 12 import org.codehaus.jackson.map.ObjectMapper; 13 import org.slf4j.Logger; 14 import org.slf4j.LoggerFactory; 15 import org.springframework.amqp.rabbit.core.RabbitTemplate; 16 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; 17 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; 18 import org.springframework.amqp.rabbit.support.CorrelationData; 19 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.stereotype.Service; 21 22 import cn.drz.config.BasicConfig; 23 import cn.drz.Message; 24 import cn.drz.service.open.SmsService; 25 26 @Service("smsService") 27 public class SmsServiceImpl implements SmsService, ConfirmCallback, ReturnCallback { 28 private static final Logger logger = LoggerFactory.getLogger(SmsServiceImpl.class); 29 30 @Resource 31 private BasicConfig basicConfig; 32 33 @Resource 34 private RabbitTemplate smsRabbitTemplate; 35 36 /** 37 * 实现消息发送到RabbitMQ交换器后接收ack回调 38 */ 39 @Override 40 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 41 42 String mqId = correlationData.getId(); 43 if (ack) { 44 logger.info("RabbitMq消息发送到交换器{}成功,mqid={}", smsRabbitTemplate.getExchange(), mqId); 45 } else { 46 logger.error("RabbitMq消息发送到交换器{}失败,mqid={},失败原因={}", smsRabbitTemplate.getExchange(), 47 mqId, cause); 48 //do something 49 } 50 } 51 52 /** 53 * 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调 54 */ 55 @Override 56 public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, 57 String replyText, String exchange, String routingKey) { 58 String errInfo = "消息发送失败,未找到指定队列:replyCode=" + replyCode + ",replyText=" + replyText 59 + ",exchange=" + exchange + ",routingKey=" + routingKey + ",message=" + message; 60 logger.error(errInfo); 61 //do something 62 } 63 64 @Override 65 public void sendMsg(Message message) throws Exception { 66 String mobiles = message.getMobile(); 67 String[] mobileList = mobiles.split(","); 68 for (int i = 0; i < mobileList.length; i++) { 69 sendRabbitMQ(mobileList[i], message.getContent()); 70 } 71 72 } 73 74 private void sendRabbitMQ(String mobile, String content) throws Exception { 75 76 String mqId = "NGDP" + System.currentTimeMillis() 77 + UUID.randomUUID().toString().replace("-", "").substring(0, 20); 78 79 List<Map<String, Object>> requestBody = new ArrayList<Map<String, Object>>(); 80 Map<String, Object> requestMap = new HashMap<>(); 81 requestMap.put("serviceid", "10001"); 82 requestMap.put("sourcesystem", "NGDP"); 83 requestMap.put("sendtarget", mobile); 84 requestMap.put("smcontent", content.length() > 990 ? content.substring(0, 990) : content); 85 requestMap.put("msg_type", "sms"); 86 requestMap.put("pipeline_number", mqId); 87 requestBody.add(requestMap); 88 String message = null; 89 ObjectMapper objectMapper = new ObjectMapper(); 90 try { 91 message = objectMapper.writeValueAsString(requestBody); 92 } catch (IOException e) { 93 logger.error("生成Json数据失败,失败原因:{}", e); 94 } 95 send(basicConfig.getSmsDirectExchange(), basicConfig.getSmsRoutingKey(), message, mqId); 96 } 97 98 private void send(String exchange, String routingKey, String msg, String mqId) 99 throws ServiceException { 100 101 logger.info("开始发送rabbitmq消息 :exchange={}, queue={}, routingkey={}, mqId={}, msg={}", 102 basicConfig.getSmsDirectExchange(), basicConfig.getSmsDirectQueue(), 103 basicConfig.getSmsRoutingKey(), mqId, msg); 104 CorrelationData correlationId = new CorrelationData(mqId); 105 smsRabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationId); 106 logger.info("发送消息完成,mqId={}", mqId); 107 } 108 109 110 111 }
标签:context core 短信 src catch direct info actor auth
原文地址:https://www.cnblogs.com/itfeng813/p/14442960.html