标签:false 指定 src 连接 hand back err icc override
Topic exchange
direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但匹配规则有些不同
routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但是通常它们指定与消息相关的某些功能。一些有效的rounting key 如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。rounting key中可以包含任意多个单词,最多255个字节。
binding key可以存在如下两种特殊的字符 即:
1、*(星号)可以代替一个单词。
2、#(哈希)可以替代零个或多个单词
? 在上面图片中,Routing key 设置为"quick.orange.rabbit"的消息将传递到两个队列。消息"lazy.orange.elephant"也将发送给他们两个。但,"quick.orange.fox"只会进入第一个队列,而"lazy.brown.fox"只会进入第二个队列。"lazy.pink.rabbit"将被传递到第二队只有一次,即使两个绑定匹配。"quick.brown.fox"与任何绑定都不匹配,因此将被丢弃。
如果我们发送一个或四个单词的消息,例如"orange"或"quick.orange.male.rabbit",这些消息将不匹配任何绑定,并且将会丢失。但"lazy.orange.male.rabbit"即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。
生产者消费者代码:
? 生产者
public class TopicEmitLog {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//创建队列
//channel.queueDeclare("direct_loge",true,false,false,null);
//声明交换机,
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message="hello";
//发送消息
//发送消息
channel.basicPublish(EXCHANGE_NAME, "topics.log", null, message.getBytes("utf-8"));
channel.close();
connection.close();
}
}
消费者1可以收到消息
public class TopicRecv {
public static final String QUEUE_NAME = "topic_queues";
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//声明通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明队列队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.绑定队列到交换器,指定路由key为topics
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
//
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
DeliverCallback deliverCallback = new DeliverCallback(){
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
消费者2收不到消息
public class TopicRecv2 {
public static final String QUEUE_NAME = "topic_queues2";
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
//获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//声明通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明队列队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.绑定队列到交换器,指定路由key为topics
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
//
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
DeliverCallback deliverCallback = new DeliverCallback(){
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
相关代码链接: https://github.com/albert-liu435/springmq
标签:false 指定 src 连接 hand back err icc override
原文地址:https://www.cnblogs.com/haizhilangzi/p/12301736.html