标签:catch rabbit bytes rri autowire cal 代码实现 gre connect
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费信息.
Direct exchange
会把消息路由到那些binding key与routing key完全匹配的Queue中
我们可以看到绑定了两个队列的exchange X。第一个队列binding key 为orange,第二个binding key为两个,一个binding key为black,另一个binding key为green。
使用routing key为orange发布到交换机的消息 将被路由到队列Q1。routing key为black 或green的消息将转到Q2。所有其他消息将被丢弃。
Multiple Bindings
用相同的binding key 绑定多个队列,可以使用binding key 为black在X和Q1与Q2之间添加绑定。在这种情况下,exchange的行为将类似于扇出,并将消息广播到所有匹配的队列。routing key为black的消息将同时传递给 Q1和Q2
下面代码实现生产者和消费者的Direct模式
生产者代码:
public class DirectEmitLog {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("direct_loge",false,false,false,null);
//声明交换机,
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message="hello";
//发送消息
// for (int i = 0; i < 10; i++) {
// String message = " message" + i;
// System.out.println("[send]:" + message);
//发送消息
channel.basicPublish(EXCHANGE_NAME, "err", null, message.getBytes("utf-8"));
//}
channel.close();
connection.close();
}
}
消费者代码:
public class DirectRecv {
private final static String QUEUE_NAME = "direct_loge";
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//声明通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "err");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
//channel.basicAck();
//channel.basicNack();
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
// DeliverCallback deliverCallback = new DeliverCallback(){
// @Override
// public void handle(String consumerTag, Delivery delivery) throws IOException {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// }
// };
//
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
// @Override
// public void handle(String consumerTag) throws IOException {
//
// }
// });
}
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
SpringBoot相关代码:
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
@Configuration
public class Tut4Config {
@Bean
public DirectExchange direct() {
return new DirectExchange("tut.direct");
}
private static class ReceiverConfig {
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1a(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("orange");
}
@Bean
public Binding binding1b(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("black");
}
@Bean
public Binding binding2a(DirectExchange direct,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(direct)
.with("green");
}
@Bean
public Binding binding2b(DirectExchange direct,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(direct)
.with("black");
}
@Bean
public Tut4Receiver receiver() {
return new Tut4Receiver();
}
}
@Bean
public Tut4Sender sender() {
return new Tut4Sender();
}
}
public class Tut4Receiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}
public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in " +
watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
public class Tut4Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
AtomicInteger index = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
private final String[] keys = {"orange", "black", "green"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (this.index.incrementAndGet() == 3) {
this.index.set(0);
}
String key = keys[this.index.get()];
builder.append(key).append(' ');
builder.append(this.count.get());
String message = builder.toString();
template.convertAndSend(direct.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}
}
标签:catch rabbit bytes rri autowire cal 代码实现 gre connect
原文地址:https://www.cnblogs.com/haizhilangzi/p/12301713.html