标签:ble back inf sub fixed amqp tom utf-8 spring
发布订阅模式其实就是生产者将数据发送到交换机,交换机将所有的消息发送到每个绑定的队列中,因此 在发布消息时可以只先指定交换机的名称,交换机的声明的代码可以放到消费者端进行声明,队列的声明也放在消费者端来声明
Exchange类型-fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
在使用发布订阅模式的时候,我们只需要声明该队列为fanout即可。如下:
channel.exchangeDeclare("logs", "fanout");
Temporary Queues(临时队列)
下面代码是两个消费者和一个生产者实现发布订阅模式
生产者代码:
public class PSEmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//声明交换机,发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
for (int i = 0; i < 10; i++) {
String message = " message" + i;
System.out.println("[send]:" + message);
//发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
消费者代码:
public class PSEmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//声明交换机,发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
for (int i = 0; i < 10; i++) {
String message = " message" + i;
System.out.println("[send]:" + message);
//发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
public class PSReceiveLogs2 {
private static final String Exchange_name = "logs";
public static void main(String[] argv) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Exchange_name, "fanout");
//随机定义一个队列名称,也可以自己定义一个队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定队列
channel.queueBind(queueName, Exchange_name, "");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
运行的时候需要首先运行消费者代码,不然没有队列,交换机不知道把消息投递到那些队列中。
? 也可以声明一个非临时队列,只需在绑定队列前面加上如下代码:
//随机定义一个队列名称,也可以自己定义一个队列名称
// String queueName = channel.queueDeclare().getQueue();
String queueName="A";
//声明队列
channel.queueDeclare(queueName,false,false,false,null);
其实发布订阅模式就是将消息发送到不同的队列中,由消费者选择不同的队列进行消费即可。
Springboot实现发布订阅
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
@Configuration
public class Tut3Config {
/**
* 交换器
* @return
*/
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("tut.fanout");
}
//@Profile("receiver")
private static class ReceiverConfig {
/**
* 队列1
* @return
*/
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
/**
* 队列2
* @return
*/
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
/**
* 绑定队列
* @param fanout
* @param autoDeleteQueue1
* @return
*/
@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}
@Bean
public Tut3Receiver receiver() {
return new Tut3Receiver();
}
}
//@Profile("sender")
@Bean
public Tut3Sender sender() {
return new Tut3Sender();
}
}
public class Tut3Receiver {
/**
* 监听队列
* @param in
* @throws InterruptedException
*/
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}
public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
public class Tut3Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.getAndIncrement() == 3) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(fanout.getName(), "", message);
System.out.println(" [x] Sent '" + message + "'");
}
}
标签:ble back inf sub fixed amqp tom utf-8 spring
原文地址:https://www.cnblogs.com/haizhilangzi/p/12301739.html