码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitMQ 学习

时间:2019-04-27 09:27:38      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:ica   nec   通过   getc   bin   imp   publish   声明   数据   

 

1,消息队列解决什么问题

  异步处理

  应用解耦

  流量消峰(高并发、  秒杀)

  日志处理......

2,virtual host   

  相当于mysql数据库的db,一般以/开头

  授权

3,使用java开发rabbitMQ

  (1) 简单队列

  技术图片

  p:消息生产者

  红色:消息队列

  c:消息消费者

  3个对象,生产者、消息队列、消费者

  使用步骤

  生产端:

    1,获取链接

    2,获取通道

    3,创建队列声明

    4,发布消息

  消费端:

    1,获取链接

    2,创建通道

    3,定义队列消费者

    4,监听队列

  简单队列的不足:耦合性高,一一对应消费者,如果有多个消费者就无法使用;

  work queues 工作队列

4,工作队列(work queues)

  分为两种 模式 轮询和公平

  协议(amqp)

  技术图片

为什么会出现工作队列,simple queue(round )是一一对应,实际开发中,生产者发送消息是好不费力,而消费者一般是需要跟业务相结合,消费者接收到消息就需要处理,可能需要花费时间,

这时候队列就会积压很多消息;

 

轮训分发的机制,不管谁忙谁闲都是你一个完成一个进行分发;

 

公平分发机制-fair dipatch:生产端,basicQos(perfetchCount=1);   消费成功后告诉MQ,才再发一条消息;使用公平分发必须关闭自动应答ack ;能者多劳!

 

5,消息应答与消息持久化

  消息应答:

  boolean aotoAck=true;
       channel.basicConsume("mySimpleQueue", aotoAck, consumer);

 

  boolean aotoAck=true(自动确认模式)一旦 rabbitMQ将消息分发给消费者就会从内存中删除该消息,

  这种情况下如果消费端服务出现异常,未能正确的消费消息,该消息会丢失;

  boolean aotoAck=false(手动模式),如果有消费者异常,就会将该消息交付给其他消费者rabbitMQ支持消费应答,消费者发送一个消息应答,告诉RabbitMQ已已消费完成,

  然后rabbitMQ可以删除该消息,消息应答默认是打开的-false

 

  消息持久化:

  当RabbitMQ服务宕机,存储在内存中的消息会丢失,这是就需要消息持久化;

  

  channel.queueDeclare("mySimpleQueue", durable, false, false, null);

  durable:持久化

 

6,订阅模式publish/subscribe

以上的消息只能由一个消费者消费,不能给多个消费者消费!

订阅模式:

技术图片

  1,一个生产者多个消费者,

  2,每个消费者都有自己的队列,

  3,生产者没有将消息直接发送给队列,而是发送给交换机或者叫转发器exchange

  4,每个队列都要绑定到交换机

  5,生产者发送的消息经过交换机,然后到达队列,就能实现一个消息被多个消费者消费

  代码:

  生产者:

   Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //发送消息
        String exchange="exchageMSG";
        channel.basicPublish(EXCHANGE_NAME, "", null, exchange.getBytes());
        channel.close();
        connection.close();

  发送消息后图形化界面有新的交换机生成,消息并没有存储,因为rabbitMQ中只有队列能存储消息!

 

 

  消费者1-代码

   
    public static final String  EXCHANGE_NAME="my_exchange";
    public static final String QUEUENAME="test_send_email_quene";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //绑定队列到交换机exchange
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
                
            };
        };
        
        boolean autoAck=false;
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

消费者2代码

 
    public static final String  EXCHANGE_NAME="my_exchange";
    public static final String QUEUENAME="test_send_message_quene";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //绑定队列到交换机exchange
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
                
            };
        };
        
        boolean autoAck=false;
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

  可视化界面截图
技术图片

7,交换机-转发器(Exchange)

  

  

  一方面是接受生产者的消息,另一方面是向队列推送消息

  匿名转发 exchangeName "";

  fanout(不处理路由键)

  技术图片

  Direct(处理路由键)

  技术图片

8,路由模式

rabbit使用步骤

服务端发送消息:1,获取链接;2,通过链接获取通道;3;通过通道声明交换机;4,发布消息(包括交换机和路由);  5,关闭资源  

消费端消费消息: 1,获取链接;2,获取通道;3,通过通道声明队列;4,绑定交换机和路由;5,创建consumer,重写handledelivery方法,获取消息 ;6,监听consumer ;

技术图片

 

 生产者代码


    public static final String EXCHANGE_NAME="rounting_exchange";
    public static final String ROUTINGKEY="info";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message="my rounting exchange";
        channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
        System.out.println("send...");
        channel.close();
        connection.close();
    }

消费者1:  

 
    public static final String QUEUENAME="test_queue_direct_1";
    public static final String EXCHANGE_NAME="rounting_exchange";
    public static final String ROUTINGKEY="error";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        channel.basicQos(1);//保证一次只发一个
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, ROUTINGKEY);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                channel.basicAck(envelope.getDeliveryTag(), false); //告诉MQ已成功收到消息,手动确认
                System.out.println(msg);
                System.out.println("test_queue_direct_1");
            }
        };
        boolean autoAck=false;//取消自动应答,默认为关闭状态
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

消费者2


    public static final String QUENENAME="test_queue_direct_2";
    public static final String EXCHANGENAME="rounting_exchange";
    public static final String ROUTINGKEYNAME1="error";
    public static final String ROUTINGKEYNAME2="info";
    public static final String ROUTINGKEYNAME3="warn";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUENENAME, false, false, false, null);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME1);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME2);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME3);
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg=new String(body);
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println(msg);
                System.out.println(QUENENAME);
            }
        };
        boolean autoAck=false;//关闭自动确认
        channel.basicConsume(QUENENAME, autoAck, consumer);
    }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

 

rabbitMQ 学习

标签:ica   nec   通过   getc   bin   imp   publish   声明   数据   

原文地址:https://www.cnblogs.com/baoguochun/p/10777519.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!