标签:amqp ceo mod 消息 component wrapper ble service config
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>spring:
rabbitmq:
host: 10.3.98.152
port: 5672
username: rabbitmq
password: rabbitmq
publisher-confirms: true
publisher-returns: true
virtual-host: /
listener:
simple:
concurrency: 1
max-concurrency: 1
retry:
enabled: true
prefetchCount: 10
concurrentConsumers: 10x
spring rabbitmq host10.3.98.152 port5672 usernamerabbitmq passwordrabbitmq publisher-confirmstrue publisher-returnstrue virtual-host/ listener simple concurrency1 max-concurrency1 retry enabledtrue prefetchCount10 concurrentConsumers10@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}public class RabbitConfig { public Queue Queue() { return new Queue("hello"); } }public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
public class HelloSender { private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
(queues = "hello")public class HelloReceiver { public void process(String hello) { System.out.println("Receiver : " + hello); } }/**
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setConnectionTimeout(10000);
/* 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
*
* @return 对列名的数组
* @throws AmqpException
* @throws IOException
*/
@Bean
public String mqMsgQueues() throws AmqpException, IOException {
//获取我方的SCEOID
Result enterpriseResult = idService.getEnterprise();
String intcEnte = "";
if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
intcEnte = (String) map.get("ente_idcode");
}
logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
intcEnte = "zqb163";
logger.info("注册的intcEnte:" + intcEnte);
String queueName = String.format("%s.queue%d", intcEnte, 1);
connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
return queueName;
}x
/***由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory*/public ConnectionFactory connectionFactory() { connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(host + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setConnectionTimeout(10000); /* 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setPublisherReturns(true); return connectionFactory;}/** * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展) * * @return 对列名的数组 * @throws AmqpException * @throws IOException */public String mqMsgQueues() throws AmqpException, IOException { //获取我方的SCEOID Result enterpriseResult = idService.getEnterprise(); String intcEnte = ""; if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) { Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData(); intcEnte = (String) map.get("ente_idcode"); } logger.info("从ID系统查询得到的SCEOID为:" + intcEnte); intcEnte = "zqb163"; logger.info("注册的intcEnte:" + intcEnte); String queueName = String.format("%s.queue%d", intcEnte, 1); connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null); connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName); return queueName;}/**
* 监听队列
*
* @return
* @throws AmqpException
* @throws IOException
*/
@Bean
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(mqMsgQueues());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//监听处理类
container.setMessageListener(handleService());
return container;
}x
/** * 监听队列 * * @return * @throws AmqpException * @throws IOException */public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames(mqMsgQueues()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); //设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //监听处理类 container.setMessageListener(handleService()); return container;}@Service
public class HandleService implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
//消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// do something
}
}public class HandleService implements ChannelAwareMessageListener { public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // do something }}标签:amqp ceo mod 消息 component wrapper ble service config
原文地址:https://www.cnblogs.com/amoszhu/p/9667181.html