码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ六中工作模式-路由模式

时间:2020-09-10 23:00:53      阅读:54      评论:0      收藏:0      [点我收藏+]

标签:basic   getenv   系统   word   rabbitmq   gety   有一个   except   ann   

RabbitMQ 路由模式

技术图片

技术图片

在发布和订阅模式中,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。

在路由模式,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定 Bindings

在发布和订阅模式中,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

ch.queueBind(queueName, "logs", "");

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

直连交换机 Direct exchange

上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

技术图片

其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

多重绑定 Multiple bindings

技术图片

使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

发送日志

我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

和前面一样,我们首先需要创建一个exchange:

//参数1: 交换机名
//参数2: 交换机类型
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);

接着来看发送消息的代码

//参数1: 交换机名
//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
//参数3: 其他配置属性
//参数4: 发布的消息数据 
ch.basicPublish("direct_logs", "error", null, message.getBytes());

订阅

接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");

代码

生产者

package rabbitmq.routing;

import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		String[] a = {"warning", "info", "error"};
		
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//参数1: 交换机名
		//参数2: 交换机类型
		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
		
		while (true) {
			System.out.print("输入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".equals(msg)) {
				break;
			}
			
			//随机产生日志级别
			String level = a[new Random().nextInt(a.length)];
			
			//参数1: 交换机名
			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
			//参数3: 其他配置属性
			//参数4: 发布的消息数据 
			ch.basicPublish("direct_logs", level, null, msg.getBytes());
			System.out.println("消息已发送: "+level+" - "+msg);
			
		}

		c.close();
	}
}

消费者

package rabbitmq.routing;

import java.io.IOException;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
		
		//自动生成对列名,
		//非持久,独占,自动删除
		String queueName = ch.queueDeclare().getQueue();
		
		System.out.println("输入接收的日志级别,用空格隔开:");
		String[] a = new Scanner(System.in).nextLine().split("\\s");
		
		//把该队列,绑定到 direct_logs 交换机
		//允许使用多个 bindingKey
		for (String level : a) {
			ch.queueBind(queueName, "direct_logs", level);
		}
		
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				String routingKey = message.getEnvelope().getRoutingKey();
				System.out.println("收到: "+routingKey+" - "+msg);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume(queueName, true, callback, cancel);
	}
}

RabbitMQ六中工作模式-路由模式

标签:basic   getenv   系统   word   rabbitmq   gety   有一个   except   ann   

原文地址:https://www.cnblogs.com/zpKang/p/13584729.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!