码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ入门教程(六):路由选择Routing

时间:2019-11-15 13:51:48      阅读:82      评论:0      收藏:0      [点我收藏+]

标签:copyright   tco   ror   oda   utf-8   rdb   amqp   error   dom   

原文:RabbitMQ入门教程(六):路由选择Routing

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/vbirdbest/article/details/78629168
分享一个朋友的人工智能教程。比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看。

简介

本节主要演示使用直连接类型,将多个路由键绑定到同一个队列上。也可以将同一个键绑定到多个队列上(多重绑定multiple bindings),此时满足键的队列都能收到消息,不满足的直接被丢弃。

技术图片

技术图片

生产者

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // Routing 的路由规则使用直连接
        String EXCHANGE_NAME = "exchange.direct.routing";
        String[] routingKeys = {"debug", "info", "warning", "error"};
        for (int i = 0; i < 20; i++){
            int random = (int)(Math.random() * 4);
            String routingKey = routingKeys[random];
            String message = "Hello RabbitMQ - " + routingKey + " - " + i;

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        }

        // 关闭资源
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费者1

public class Consumer1 {
    @Test
    public void testBasicConsumer1() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.direct.routing";
        // 生成一个随机的名称,queueDeclare()方法没有任何参数,当最后一个消费者断开时就会删除掉该队列,当消费者结束后可以看到队列就删除了
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 在消费者端队列绑定

        // 将一个对列绑定多个路由键
        String[] routingKeys = {"debug", "info"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received ‘" + message + "‘, 处理业务中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

消费者2

public class Consumer2 {
    @Test
    public void testBasicConsumer2() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();


        String EXCHANGE_NAME = "exchange.direct.routing";
        // 生成一个随机的名称
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 在消费者端队列绑定

        // 将一个对列绑定多个路由键
        String[] routingKeys = {"warning", "error"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received ‘" + message + "‘, 处理业务中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

运行结果

技术图片

技术图片

技术图片

技术图片

分享一个朋友的人工智能教程。比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看。

RabbitMQ入门教程(六):路由选择Routing

标签:copyright   tco   ror   oda   utf-8   rdb   amqp   error   dom   

原文地址:https://www.cnblogs.com/lonelyxmas/p/11865786.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!