标签:stat bind 消费 style ati routing rect int efault
直连交换机类型为:direct。加入了路由键routingKey的概念。
就是说 生产者投递消息给指定交换机的指定路由键。
只有绑定了此交换机指定路由键的消息队列才可以收到消息。
生产者:
package com.kf.queueDemo.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 路由模式的生产者(带路由键) * @author kf * */ public class DirectProducer { //交换机 private static String DIRECTEXCHANGE = "DIRECTEXCHANGE"; //路由键 private static String ROUTINGKEY = "SMS"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机类型为路由模式 channel.exchangeDeclare(DIRECTEXCHANGE, "direct"); String msg = "direct_mes"; //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上 channel.basicPublish(DIRECTEXCHANGE, ROUTINGKEY, null, msg.getBytes()); channel.close(); connection.close(); } }
消费者:
package com.kf.queueDemo.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 路由模式的邮件消费者 * @author kf * */ public class DirectEMAILConsumer { //队列名 private static String EMAILQUEUENAME = "EMAILQUEUENAME"; //路由键名 private static String SMSROUTINGKEY = "SMS"; private static String EMAILROUTINGKEY = "EMAIL"; //交换机 private static String DIRECTEXCHANGE = "DIRECTEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者启动====="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的 channel.queueDeclare(EMAILQUEUENAME, false, false, false, null); //绑定队列到交换机的指定路由键 channel.queueBind(EMAILQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("进入邮件接收消息的监听"); String s = new String(body, "utf-8"); System.out.println("邮件消费者接收到消息:"+s); }; }; //参数分别是:队列名,是否自动应答,监听的回调类 channel.basicConsume(EMAILQUEUENAME, true, consumer); } }
package com.kf.queueDemo.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 路由模式的短信消费者 * @author kf * */ public class DirectSMSConsumer { //队列名 private static String SMSQUEUENAME = "SMSQUEUENAME"; //路由键名 private static String SMSROUTINGKEY = "SMS"; private static String EMAILROUTINGKEY = "EMAIL"; //交换机 private static String DIRECTEXCHANGE = "DIRECTEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者启动====="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的 channel.queueDeclare(SMSQUEUENAME, false, false, false, null); //绑定队列到交换机的指定路由键 channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, SMSROUTINGKEY); //绑定多个交换机的路由键 channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("进入短信接收消息的监听"); String s = new String(body, "utf-8"); System.out.println("短信消费者接收到消息:"+s); }; }; //参数分别是:队列名,是否自动应答,监听的回调类 channel.basicConsume(SMSQUEUENAME, true, consumer); } }
标签:stat bind 消费 style ati routing rect int efault
原文地址:https://www.cnblogs.com/fuguang/p/10660575.html