码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ入门(三) —— fanout交换器

时间:2016-04-12 16:08:46      阅读:163      评论:0      收藏:0      [点我收藏+]

标签:rabbitmq

这篇文章主要介绍下fanout类型的exchange。fanout,顾名思义,就是像风扇吹面粉一样,吹得到处都是。如果使用fanout类型的exchange,那么routing key就不重要了。因为我们向exchange发送消息时用不着指定routing key,它会把消息给每个绑定到该exchange的queue发一份。

技术分享

package com.jaeger.exchange.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Producer {
	private static final String MY_EXCHANGE_NAME = "MyExchange";
	private static final String MY_ROUTING_KEY1 = "MyRoutingKey1";
	private static final String MY_QUEUE_NAME1 = "MyQueue1";
	private static final String MY_ROUTING_KEY2 = "MyRoutingKey2";
	private static final String MY_QUEUE_NAME2 = "MyQueue2";
	private static final String MY_ROUTING_KEY3 = "MyRoutingKey3";
	private static final String MY_QUEUE_NAME3 = "MyQueue3";
	private static final String FANOUT = "fanout";
	private static final String HOST = "172.19.64.21";
	private static final String USER = "jaeger";
	private static final String PASSWORD = "root";
	private static final int PORT = 5672;

	@Test
	public void createExchangeAndQueue() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		// 创建一个fanout类型的exchange
		channel.exchangeDeclare(MY_EXCHANGE_NAME, FANOUT);
		// 创建三个queue
		channel.queueDeclare(MY_QUEUE_NAME1, false, false, false, null);
		channel.queueDeclare(MY_QUEUE_NAME2, false, false, false, null);
		channel.queueDeclare(MY_QUEUE_NAME3, false, false, false, null);
		// 创建三个routing key,把exchange和queue绑定到一起
		channel.queueBind(MY_QUEUE_NAME1, MY_EXCHANGE_NAME, MY_ROUTING_KEY1);
		channel.queueBind(MY_QUEUE_NAME2, MY_EXCHANGE_NAME, MY_ROUTING_KEY2);
		channel.queueBind(MY_QUEUE_NAME3, MY_EXCHANGE_NAME, MY_ROUTING_KEY3);
		channel.close();
		connection.close();
	}

	@Test
	public void produce() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String message = "Hello 世界!";
		/*
		向RabbitMQ发送消息。我们这里指定了exchange和routing key的名称,RabbitMQ会去找有没有叫这个名称的exchange,
		如果找到了又发现这个exchange是fanout类型,就不会再去看routing key了,而是把消息放到所有绑定到这个exchange的queue里面。
		这里我们虽然指定了一个routing key,但实际上是没有任何效果的,我们还可以用空字符串,最后消息都是到达所有queue的。
		*/
		channel.basicPublish(MY_EXCHANGE_NAME, MY_ROUTING_KEY1, null, message.getBytes("utf-8"));
		//channel.basicPublish(MY_EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
		System.out.println("Sent ‘" + message + "‘");
		channel.close();
		connection.close();
	}
	
	@Test
	public void consume() throws IOException, TimeoutException, InterruptedException{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received ‘" + message + "‘");
			}
		};
		channel.basicConsume(MY_QUEUE_NAME1, true, consumer);
		Thread.sleep(1000);
	}
}

我们先运行createExchangeAndQueue,把三个queue绑定到一个fanout类型的exchange上:

技术分享

技术分享

技术分享

再运行produce方法,把消息发到exchange让其转发:

技术分享

我们看到虽然我们指定了一个routing key,但实际上没什么用,也可以用""代替,消息会发到每一个queue里面。

最后我们运行consume方法,让它去消费MyQueue1队列里面的消息:

技术分享

可以看到MyQueue1里面的消息被消费掉了。


本文出自 “銅鑼衛門” 博客,请务必保留此出处http://jaeger.blog.51cto.com/11064196/1762983

RabbitMQ入门(三) —— fanout交换器

标签:rabbitmq

原文地址:http://jaeger.blog.51cto.com/11064196/1762983

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!