标签:amqp ceo mod 消息 component wrapper ble service config
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 10.3.98.152
port: 5672
username: rabbitmq
password: rabbitmq
publisher-confirms: true
publisher-returns: true
virtual-host: /
listener:
simple:
concurrency: 1
max-concurrency: 1
retry:
enabled: true
prefetchCount: 10
concurrentConsumers: 10
x
spring
rabbitmq
host10.3.98.152
port5672
username rabbitmq
password rabbitmq
publisher-confirmstrue
publisher-returnstrue
virtual-host /
listener
simple
concurrency1
max-concurrency1
retry
enabledtrue
prefetchCount10
concurrentConsumers10
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}
public class RabbitConfig {
public Queue Queue() {
return new Queue("hello");
}
}
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
public class HelloSender {
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
queues = "hello") (
public class HelloReceiver {
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
/**
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setConnectionTimeout(10000);
/* 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
*
* @return 对列名的数组
* @throws AmqpException
* @throws IOException
*/
@Bean
public String mqMsgQueues() throws AmqpException, IOException {
//获取我方的SCEOID
Result enterpriseResult = idService.getEnterprise();
String intcEnte = "";
if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
intcEnte = (String) map.get("ente_idcode");
}
logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
intcEnte = "zqb163";
logger.info("注册的intcEnte:" + intcEnte);
String queueName = String.format("%s.queue%d", intcEnte, 1);
connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
return queueName;
}
x
/**
*由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory
*/
public ConnectionFactory connectionFactory() {
connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setConnectionTimeout(10000);
/* 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展)
*
* @return 对列名的数组
* @throws AmqpException
* @throws IOException
*/
public String mqMsgQueues() throws AmqpException, IOException {
//获取我方的SCEOID
Result enterpriseResult = idService.getEnterprise();
String intcEnte = "";
if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) {
Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData();
intcEnte = (String) map.get("ente_idcode");
}
logger.info("从ID系统查询得到的SCEOID为:" + intcEnte);
intcEnte = "zqb163";
logger.info("注册的intcEnte:" + intcEnte);
String queueName = String.format("%s.queue%d", intcEnte, 1);
connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
return queueName;
}
/**
* 监听队列
*
* @return
* @throws AmqpException
* @throws IOException
*/
@Bean
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(mqMsgQueues());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//监听处理类
container.setMessageListener(handleService());
return container;
}
x
/**
* 监听队列
*
* @return
* @throws AmqpException
* @throws IOException
*/
public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(mqMsgQueues());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//监听处理类
container.setMessageListener(handleService());
return container;
}
@Service
public class HandleService implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
//消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// do something
}
}
public class HandleService implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
//消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// do something
}
}
标签:amqp ceo mod 消息 component wrapper ble service config
原文地址:https://www.cnblogs.com/amoszhu/p/9667181.html