码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMq

时间:2020-07-13 19:58:40      阅读:58      评论:0      收藏:0      [点我收藏+]

标签:stat   font   基于   ade   argument   ringbuf   root   dir   byte   

一、连接和通道

@Configuration
public class RabbirnqConfig {
    private static final String RABBIT_HOST = "localhost";
    private static final String RABBIT_USERNAME = "root";
    private static final String RABBIT_PASSWORD = "root";
    private static final int RABBIT_PORT = 5672;

    @Bean
    public static  Connection getConnection(){
            Connection connection = null;
            ConnectionFactory connectionFactory = new ConnectionFactory();

            //连接rabbitMq,第一种方式,通过分配属性
            connectionFactory.setHost(RABBIT_HOST);//主机名
            connectionFactory.setUsername(RABBIT_USERNAME);//用户名
            connectionFactory.setPassword(RABBIT_PASSWORD);//密码
            connectionFactory.setPort(RABBIT_PORT);//端口

            try {
                //连接rabbitMq,第二种方式,使用uri,如下会有默认值
                //connectionFactory.setUri("amqp:// userName:password @ hostName:portNumber / virtualHost");

                connection = connectionFactory.newConnection();
            }catch (Exception e){
                e.printStackTrace();
            }
        return connection;
    }
}

二、通过交换机进行消息传递的几种模式

  如果消息发送到没有队列绑定的交换机时,消息将丢失,因为交换机没有存储消息的能力,消息只能存在在队列中
  如果消息不通过交换机的,一条消息可能会有n个队列抢

  <一>、扇形交换机模式

        解释几个频繁使用的方法: 

        获取连接:Connection connection = RabbirnqConfig.getConnection();
        创建通道:Channel channel = connection.createChannel();
        声明交换机:channel.exchangeDeclare(EXCHANGE_NAME_FANOUT,"fanout",true,true,null);、
          exchangeDeclare有很多的方法重载。以参数最多的为例。此处列出每个参数,方便具体使用哪个重载方法时,里面用到的参数都可以在这里找到代表的意思
          DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, boolean var5, Map<String, Object> var6) throws IOException;
          var1(exchange):交换器名称
          var2(BuiltinExchangeType type):交换器类型,可选值:DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")
          var3(boolean durable):是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息
          var4(boolean autoDelete):是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为fase
          var5(boolean internal):是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
          var6(Map<String, Object> arguments):其它一些结构化参数比如:alternate-exchange
          消息发布:channel.basicPublish(EXCHANGE_NAME_FANOUT,"",null,message.getBytes());

    1)生产者

public void fanoutPpro() throws Exception {//生产消息
    Connection connection = RabbirnqConfig.getConnection();
    //创建通道
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME_FANOUT,"fanout",true,true,null);

    String message = "Hello World!";

    channel.basicPublish(EXCHANGE_NAME_FANOUT,"",null,message.getBytes());
    System.out.println("生产者 send :"+message);
    channel.close();
    connection.close();
}

  

    2)  消费者

      几个方法解释:       

      创建队列:channel.queueDeclare(F_QUEUE_NAME,false,false,false,null); 此方法同一个队列名只能创建一次,如果再次接受消息是还是用该方法创建同一个队列名,会抛出异常(再次测试这个队列接受消息,把这个方法注释掉即可)

         queueDeclare有多个重载方法,以参数最多的这个为例解释
        com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
        如果queueDeclare()不带参数,该方法默认创建一个由RabbitMq命名的名称,这种队列也称之为匿名队列,排他 的,自动删除的,非持久化的队列queueDeclare()不带参数方法默认创建一个由RabbitMq命名的名称,这种队列也称之为匿名队列,排他的,自动删除的,非持久化的队列
        var1(queue):队列名称
        var2(durable):是否持久化, true ,表示持久化,会存盘,服务器重启仍然存在,false,非持久化
        var3(exclusive):exclusive : 是否排他的,true,排他。如果一个队列声明为排他队列,该队列会对首次声明它的连接可见,并在连接断开时自动删除;排他是基于连接的Connection可见的,同一个连接的不同信道是可以同时访问同一个连接创建的排他队列
                         如果一个连接已经声明了一个排他队列,其它连接是不允许建立同名的排他队列,这个与普通队列不同,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除
        var4(autoDelete);是否自动删除,true,自动删除,自动删除的前提:至少有一个消息者连接到这个队列,之后所有与这个队列连接的消息都断开时,才会自动删除;注意,生产者客户端创建这个队列,或者没有消息者客户端连接这个队列时,不会自动删除这个队列
        var5(arguments):其它一些参数。如:x-message-ttl,之类
     
      绑定到交换机:channel.queueBind(F_QUEUE_NAME,EXCHANGE_NAME_FANOUT,"");
  public void fanoutCus1() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(F_QUEUE_NAME,false,false,false,null);
        channel.queueBind(F_QUEUE_NAME,EXCHANGE_NAME_FANOUT,"");

        channel.basicQos(5);//限制此通道预取数为5,当该队列没对接受到的消息进行回复时,不会再给其新的消息

        StringBuffer message = new StringBuffer();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println("接到消息1:"+message);
          channel.basicAck(envelope.getDeliveryTag(),true);//手动回复确认
    } }; 
    channel.basicConsume(F_QUEUE_NAME,false,defaultConsumer);//对接收到的消息自动回复确认
}

    重点解释 

      消费者消息确认方式分为:

        自动(交货无需确认,又称“开火即忘”)
        手动(送货需要客户确认)

    消息回复设置:channel.basicConsume(F_QUEUE_NAME,false,defaultConsumer);
            basicConsume(String var1, boolean var2, Consumer var3) throws IOException;
               
当var2为true,表示自动回复。
               为false表示手动回复,此时需要与
channel.basicQos()方法配合使用,限制消息交付数量,避免消费者超负荷
    channel.basicQos():限制此通道预取数,该值定义通道上允许的未确认交付的最大数量。通过使用basic.qos方法设置“预取计数”值。一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非已确认至少一个未处理的消息。
                举例:现在通道里有1、2、3、4四个未确认交付的消息,队列channel里目前也有四个未确认的消息,且队列channel设置的预取值为2,现在chennel确认处理了其内部3个消息,那么就会再从通道取两个未确认的消息,因为其设置的预取值为2,即便channel里面还有一个空位也只会取2个消息
    手动回复确认:channel.basicAck();
           void basicAck(long var1, boolean var3) throws IOException;
           var1(
deliveryTag):该消息的id,即RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID
           var2(multiple):是否批量.true:将一次性处理所有小于等于传入值的所有消息  


  

  <二>、路由,direct,由路由键决定发到对应的队列,有几个和路由规则匹配的,就发几份数据去往队列

      1、生产者

     public void directPpro() throws Exception {
        Connection connection = RabbirnqConfig.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct");

        //创建队列
        //channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//        for (int i = 0; i < 20; i++) {
//            message = message + i;
//            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
//            Thread.sleep(1000);
//        }
        String message = "这是消息B";
        channel.basicPublish(EXCHANGE_NAME_DIRECT, "B", null, message.getBytes());
        String messageA = "这是消息A";
        channel.basicPublish(EXCHANGE_NAME_DIRECT, "A", null, messageA.getBytes());
        String messageC = "这是消息C";
        channel.basicPublish(EXCHANGE_NAME_DIRECT, "C", null, messageC.getBytes());
        System.out.println("生产者 send :"+message);
        System.out.println("生产者 send :"+messageA);
        System.out.println("生产者 send :"+messageC);
        channel.close();
        connection.close();
    }

      2、消费者

    public void directCus1() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct");
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME_DIRECT,"A");

        channel.basicQos(1);//当该队列没对接受到的消息进行回复时,不会再给其新的消息

        StringBuffer message = new StringBuffer();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println("接到消息A:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME1,true,defaultConsumer);//对接收到的消息自动回复确认
    }

  

public void directCus2() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME3,false,false,false,null);
        channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct");
        channel.queueBind(QUEUE_NAME3,EXCHANGE_NAME_DIRECT,"A");

        channel.basicQos(1);//限制此通道预取数为1,当该队列没对接受到的消息进行回复时,不会再给其新的消息

        StringBuffer message = new StringBuffer();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println("接到消息B:"+message);
                channel.basicAck(envelope.getDeliveryTag(),false);//手动回复确认
                //deliveryTag:该消息的index    multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                channel.basicReject(envelope.getDeliveryTag(),true);
            }
        };
        channel.basicConsume(QUEUE_NAME3,false,defaultConsumer);//对接收到的消息不自动回复确认,需手动回复
    }

  

  <三>、订阅,会把消息发到各订阅该主题的队列中,该队列连接后即可取到

     1、生产者

public void topicPro() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic");
        String message = "主题订阅";
        channel.basicPublish(EXCHANGE_NAME_TOPIC,"a.b",false,false,null,message.getBytes());
        System.out.println("已发送:"+message);
        channel.close();
        connection.close();
    }

      2、消费者

public void topicCus1() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("topic_cus1",false,false,false,null);
        channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic");
        channel.queueBind("topic_cus1",EXCHANGE_NAME_TOPIC,"a.*");
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                 throws IOException {
                     super.handleDelivery(consumerTag, envelope, properties, body);
                     System.out.println("t1: "+new String(body,"UTF-8"));
                 }
         };

        channel.basicConsume("topic_cus1",true,consumer);
    }

 

public void topicCus2() throws Exception{
        Connection connection = RabbirnqConfig.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("topic_cus2",false,false,false,null);
        channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic");
        channel.queueBind("topic_cus2",EXCHANGE_NAME_TOPIC,"a.*");
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println("T2:"+new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume("topic_cus2",false,consumer);
    }

  




RabbitMq

标签:stat   font   基于   ade   argument   ringbuf   root   dir   byte   

原文地址:https://www.cnblogs.com/Lk-skyhorse/p/13295261.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!