标签:消费 row 绑定 log oid client asi cli consumer
主题交换机类型为:topic。
是直连交换机的一种。只是比直连交换机更灵活,在路由键上引入了通配符的概念
topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有
生产者 :
package com.kf.queueDemo.exchange.topic; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 路由模式的生产者(带路由键) * @author kf * */ public class TopicProducer { //交换机 private static String TOPICEXCHANGE = "TOPICEXCHANGE"; //路由键 private static String ROUTINGKEY = "log.sms"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机类型为主题交换机模式 channel.exchangeDeclare(TOPICEXCHANGE, "topic"); String msg = "topic_mes"; //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上 channel.basicPublish(TOPICEXCHANGE, ROUTINGKEY, null, msg.getBytes()); channel.close(); connection.close(); } }
消费者:
package com.kf.queueDemo.exchange.topic; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 路由模式的邮件消费者 * @author kf * */ public class TopicEMAILConsumer { //队列名 private static String EMAILQUEUENAME = "EMAILQUEUENAME"; //路由键名 topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有 private static String SMSROUTINGKEY = "sms"; private static String EMAILROUTINGKEY = "log.*"; //交换机 private static String TOPICEXCHANGE = "TOPICEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者启动====="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的 channel.queueDeclare(EMAILQUEUENAME, false, false, false, null); //绑定队列到交换机的指定路由键 channel.queueBind(EMAILQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("进入邮件接收消息的监听"); String s = new String(body, "utf-8"); System.out.println("邮件消费者接收到消息:"+s); }; }; //参数分别是:队列名,是否自动应答,监听的回调类 channel.basicConsume(EMAILQUEUENAME, true, consumer); } }
package com.kf.queueDemo.exchange.topic; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 路由模式的短信消费者 * @author kf * */ public class TopicSMSConsumer { //队列名 private static String SMSQUEUENAME = "SMSQUEUENAME"; //路由键名 topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有 private static String SMSROUTINGKEY = "log.#"; private static String EMAILROUTINGKEY = "EMAIL"; //交换机 private static String TOPICEXCHANGE = "TOPICEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者启动====="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的 channel.queueDeclare(SMSQUEUENAME, false, false, false, null); //绑定队列到交换机的指定路由键 channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, SMSROUTINGKEY); //绑定多个交换机的路由键 channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("进入短信接收消息的监听"); String s = new String(body, "utf-8"); System.out.println("短信消费者接收到消息:"+s); }; }; //参数分别是:队列名,是否自动应答,监听的回调类 channel.basicConsume(SMSQUEUENAME, true, consumer); } }
标签:消费 row 绑定 log oid client asi cli consumer
原文地址:https://www.cnblogs.com/fuguang/p/10660579.html