码迷,mamicode.com
首页 > 其他好文 > 详细

MQ的订阅模式

时间:2018-03-20 00:44:25      阅读:224      评论:0      收藏:0      [点我收藏+]

标签:div   create   com   手动   技术分享   width   get   rabbit   imp   

一:介绍

1.模式

  技术分享图片

 

2.使用场景

  一个生产者,多个消费者

  每一个消费者都有自己的队列

  生产者没有直接把消息发送给队列,而是发送到了交换机

  每一个队列都要绑定到交换机

  可以实现一个消息被多个消费者消费。

 

二:程序

1.生产者

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 public class FanoutSend {
 8     private static final String EXCHANGE_NAME="text_exchange_fanout";
 9     public static void main(String[] args) throws Exception {
10         //获取一个连接
11         Connection connection= ConnectionUtil.getConnection();
12         //从连接中获取一个通道
13         Channel channel=connection.createChannel();
14         //创建交换机
15         channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
16 
17         //消息
18         String msg="hello pubsub";
19 
20         //发送
21         channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
22 
23         System.out.println("send msg:"+msg);
24         //关闭连接
25         channel.close();
26         connection.close();
27     }
28 }

 

2.消费者一

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FanoutReceive1 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_email";
11     public static void main(String[] args)throws Exception{
12         //获取一个连接
13         Connection connection = ConnectionUtil.getConnection();
14         //创建通道
15         final Channel channel = connection.createChannel();
16         //创建队列声明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18 
19         //绑定交换机
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21 
22         //一次只能发送一个消息
23         channel.basicQos(1);
24 
25         //创建消费者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[1]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手动应答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //监听队列,不是自动应答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

 

3.消费者二

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FanoutReceive2 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_ems";
11     public static void main(String[] args)throws Exception{
12         //获取一个连接
13         Connection connection = ConnectionUtil.getConnection();
14         //创建通道
15         final Channel channel = connection.createChannel();
16         //创建队列声明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18 
19         //绑定交换机
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21 
22         //一次只能发送一个消息
23         channel.basicQos(1);
24 
25         //创建消费者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[2]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手动应答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //监听队列,不是自动应答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

 

4.效果

  send:

  技术分享图片

  receive1:

  技术分享图片

  receive2:

  技术分享图片

 

5.运行注意点

  如果之间运行receive类,会发现报错,因为没有交换机。

  所以,可以先运行send类,虽然交换机不能存储发送的消息,但是可以创建交换机。

  然后,就可以按照原来的方式。

  先启动两个消费者进行监听,然后启动生产者。

  现象:就是消费者都获取到了生产者生产的消息。

 

MQ的订阅模式

标签:div   create   com   手动   技术分享   width   get   rabbit   imp   

原文地址:https://www.cnblogs.com/juncaoit/p/8605874.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!