码迷,mamicode.com
首页 > 其他好文 > 详细

5、RabbitMQ-订阅模式 Publish/Subscribe

时间:2019-03-14 16:35:03      阅读:182      评论:0      收藏:0      [点我收藏+]

标签:9.png   utf-8   exception   string   height   exchange   mamicode   throws   ima   

http://www.rabbitmq.com/tutorials/tutorial-three-java.html

 

1、模型图

我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多
个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

技术图片

在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

 

举列:
类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

 

解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

 

 2、代码实践

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class Send {
     private static final String  EXCHANGE_NAME="test_exchange_fanout";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           
           Channel channel = conn.createChannel();
           
           //声明交换机
           channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
           
           //发送信息
           String msg = "hello";
           
           channel.basicPublish(EXCHANGE_NAME, "", null,  msg.getBytes());
           
           channel.close();
           conn.close();
     }
}

 

 技术图片

 

 

但是这个发送的消息到哪了呢? 
消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
能力.因为这时还没有队列,所以就会丢失;
小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!

 

消费者1

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;
public class Receive {
     
     private static final String QUEUE_NAME="test_queue1";
     private static final String  EXCHANGE_NAME="test_exchange_fanout";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           
           Channel channel = conn.createChannel();
           
           //队列声明
           channel.queueDeclare(QUEUE_NAME, false, false, false,  null);
           
           //绑定队列到交换机转发器
           channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
           
                     //定义一个消费者
                     Consumer consumer = new  DefaultConsumer(channel){
                           //收到消息就会触发这个方法
                           @Override
                           public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                     throws IOException {
                                String msg = new  String(body,"utf-8");
                                System.out.println("消费者1接收到的消息" + msg);
                                
                                try {
                                     Thread.sleep(1500);
                                } catch (InterruptedException e)  {
                                     e.printStackTrace();
                                }finally{
                                     System.out.println("消费者1处理完成!");
                                     //手动回执
                                     channel.basicAck(envelope.getDeliveryTag(), false);
                                }
                                
                           }
                     };
                     //监听队列
                     //自动应答false
                     boolean autoAck = false;
                     channel.basicConsume(QUEUE_NAME, autoAck,  consumer);
     }
}

 

 

消费者2

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive2 {
    
    private static final String QUEUE_NAME="test_queue";
    private static final String EXCHANGE_NAME="test_exchange_fanout";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection conn = ConnectionUtils.getConnection();
        
        Channel channel = conn.createChannel();
        
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //绑定队列到交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        
                
                //定义一个消费者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就会触发这个方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消费者2接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消费者2处理完成!");
                            //手动回执
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                        
                    }
                };
                //监听队列
                //自动应答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

 

一个消息 可以被多个消费者

技术图片

 

 技术图片

 

 后台进行查看:

技术图片

 

5、RabbitMQ-订阅模式 Publish/Subscribe

标签:9.png   utf-8   exception   string   height   exchange   mamicode   throws   ima   

原文地址:https://www.cnblogs.com/Mrchengs/p/10531050.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!