标签:rabbitmq
在RabbitMQ入门(一)里我们讲到exchange有三种最主要的类型:direct、fanout和topic。
这里我们先来看看最简单的direct交换器的使用。
下面是测试代码:
package com.jaeger.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static final String MY_EXCHANGE_NAME = "MyExchange"; private static final String MY_ROUTING_KEY = "MyRoutingKey"; private static final String MY_QUEUE_NAME = "MyQueue"; private static final String DIRECT = "direct"; private static final String HOST = "172.19.64.21"; private static final String USER = "jaeger"; private static final String PASSWORD = "root"; private static final int PORT = 5672; @Test public void createExchangeAndQueue() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建一个direct类型的exchange channel.exchangeDeclare(MY_EXCHANGE_NAME, DIRECT); // 创建一个queue channel.queueDeclare(MY_QUEUE_NAME, false, false, false, null); // 创建一个routing key,把exchange和queue绑定到一起 channel.queueBind(MY_QUEUE_NAME, MY_EXCHANGE_NAME, MY_ROUTING_KEY); channel.close(); connection.close(); } @Test public void produce() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String message = "Hello 世界!"; /* 向RabbitMQ发送消息。我们这里指定了exchange和routing key的名称,RabbitMQ会去找有没有叫这个名称的exchange, 如果找到了,就会再查看在该exchange上是否绑定一个跟我们指定名称一样的routing key,找到了就把消息放到routing key 对应的queue里面 */ channel.basicPublish(MY_EXCHANGE_NAME, MY_ROUTING_KEY, null, message.getBytes("utf-8")); System.out.println("Sent ‘" + message + "‘"); channel.close(); connection.close(); } @Test public void consume() throws IOException, TimeoutException, InterruptedException{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received ‘" + message + "‘"); } }; channel.basicConsume(MY_QUEUE_NAME, true, consumer); Thread.sleep(1000); } }
在讲direct交换器之前先简单介绍下RabbitMQ的connection和channel。
客户端到RabbitMQ的connection是通过TCP建立的,为什么不像数据库一样直接使用connection来操作呢?因为TCP的建立和销毁都非常消耗资源,对于一个消息服务器来说,它的任务就是处理海量的消息,如果每个消息的产生和消费都建立TCP连接的话显然不合适。
为了解决这个问题,引入了channel,channel相当于一个TCP连接内的虚拟连接。消息的产生和消费都是通过channel完成的,每个channel都会被指定一个唯一的ID用于区分不同的channel,以避免互相干扰。就好像光纤网线一样,网线中的每条光纤束都可以用来传递消息,而不会出现互相干扰
下面我们就来看看direct交换器的效果,先执行createExchangeAndQueue方法:
从后台可以看到,我们指定的exchange、routing key和queue都正确创建了,并且名称都是我们指定的名称。
接下来我们在运行produce方法向RabbitMQ发送消息:
可以看到MyQueue队列里面已经添加了一条消息。最后运行consume方法去消费这条消息:
对于消费端来说,只用知道queue的名称就可以了。而对于发送端,则需要知道exchange和routing key的名称,相对而言queue的名称就不那么重要了。
最后我们来分析下RabbitMQ入门(一)里面的send方法,该方法里面貌似并没有指定exchange和routing key的名称,也没有进行queue和exchange的绑定操作,为什么也能发送成功呢?下面是代码片段:
public void send() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //A String message = "Hello 世界!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8")); //B System.out.println("Sent ‘" + message + "‘"); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
在A处我们创建了一个名称为QUEUE_NAME的queue,没有让其和任何的routing key对应,也没有声明任何exchange。
在B处我们发送消息的时候,指定的exchange是一个空字符串,routing key居然是queue的名称,不是说exchange是根据routing key来决定放入哪个queue的么,这里怎么用queue的名称?
对于上面的问题,下面我们来说明下:
RabbitMQ里面有一个默认的exchange,他的名称就是一个空字符串。我们创建的每一个queue都会跟这个exchange进行绑定(所以我们在通过自定义的routing key绑定exchange和queue时,实际上这个queue还偷偷绑定到了这个默认的exchange),而中间的routing key的名称就是queue的名称。所以上面B处的方法中的exchange就是这个默认的exchange,而routing key看起来好像就是queue的名称,实际上是因为routing key的名称跟queue名称一样而已。我们把上面produce方法修改下:
@Test public void produce() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String message = "Hello 世界!"; // 这里我们把消息发送给默认的exchange,routing key名称就是queue的名称 channel.basicPublish("", MY_QUEUE_NAME, null, message.getBytes("utf-8")); System.out.println("Sent ‘" + message + "‘"); channel.close(); connection.close(); }
消息成功通过默认的exchange和routing key放到了MyQueue队列里面。
本文出自 “銅鑼衛門” 博客,请务必保留此出处http://jaeger.blog.51cto.com/11064196/1762941
标签:rabbitmq
原文地址:http://jaeger.blog.51cto.com/11064196/1762941