标签:生产者 消费者 cep ack rgs utils env time div
rabbit引入交换机概念。
交换机与生产者绑定。
队列与消费者绑定。
队列又与交换机绑定。
扇形交换机是 fanout类型的。
类似于其他消息中间件的 topic。一对多(生产者推送消息到指定交换机,一个或多个绑定了此交换机的消费者的队列都可以收到此交换机的消息)
生产者:
package com.kf.queueDemo.exchange.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * fanout 扇形交换机 * @author kf *生产者与交换机做绑定,消费者的队列与交换机做绑定 */ public class FanoutProducer { private static String EXCHANGENAME = "MYEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //绑定交换机。第一个参数是交换机的名字,第二个是交换机的类型 channel.exchangeDeclare(EXCHANGENAME, "fanout"); String mes = "exchange_mes"; //发布消息 channel.basicPublish(EXCHANGENAME, "", null, mes.getBytes()); System.out.println("fanout发送消息"+mes); channel.close(); connection.close(); } }
消费者:
package com.kf.queueDemo.exchange.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * fanout消费者(消费者创建队列,声明队列与交换机的绑定关系) * @author kf * */ public class FanoutEmail_Consumer { private static String EXCHANGENAME = "MYEXCHANGE"; private static String EMAIL_QUEUE = "EMAIL_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者已启动=============="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明消费者队列 channel.queueDeclare(EMAIL_QUEUE, false, false, false, null); //队列绑定交换机 channel.queueBind(EMAIL_QUEUE, EXCHANGENAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("EMAIL消费者监听"); String s = new String(body, "utf-8"); System.out.println("监听到消息:"+s); }; }; //通道监听 自动签收 channel.basicConsume(EMAIL_QUEUE, true,consumer); } }
package com.kf.queueDemo.exchange.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * fanout消费者(消费者创建队列,声明队列与交换机的绑定关系) * @author kf * */ public class FanoutSMS_Consumer { private static String EXCHANGENAME = "MYEXCHANGE"; private static String SMS_QUEUE = "SMS_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者已启动=============="); Connection connection = RabbitConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明消费者队列 channel.queueDeclare(SMS_QUEUE, false, false, false, null); //队列绑定交换机 channel.queueBind(SMS_QUEUE, EXCHANGENAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("SMS消费者监听"); String s = new String(body, "utf-8"); System.out.println("监听到消息:"+s); }; }; //通道监听 自动签收 channel.basicConsume(SMS_QUEUE, true,consumer); } }
标签:生产者 消费者 cep ack rgs utils env time div
原文地址:https://www.cnblogs.com/fuguang/p/10660559.html